-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Skip performOnPrimary step when executing PublishCheckpoint. #6366
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,9 +21,7 @@ | |
import org.opensearch.common.Priority; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.index.IndexModule; | ||
import org.opensearch.indices.IndicesService; | ||
import org.opensearch.indices.replication.common.ReplicationType; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
import org.opensearch.test.transport.MockTransportService; | ||
import org.opensearch.transport.TransportService; | ||
|
@@ -33,6 +31,7 @@ | |
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
|
||
/** | ||
|
@@ -43,22 +42,13 @@ public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT { | |
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); | ||
|
||
private void createIndex(int replicaCount) { | ||
prepareCreate( | ||
INDEX_NAME, | ||
Settings.builder() | ||
.put("index.number_of_shards", 1) | ||
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) | ||
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) | ||
.put("index.number_of_replicas", replicaCount) | ||
.put("index.refresh_interval", -1) | ||
).get(); | ||
prepareCreate(INDEX_NAME, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)).get(); | ||
} | ||
|
||
/** | ||
* This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before | ||
* relocation and after relocation documents are indexed and documents are verified | ||
*/ | ||
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: For completenes perspective, do we need relocation tests for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes good idea, I've added randomness to select a refresh policy in all of the tests in this class other than the explicit wait_until test. |
||
public void testPrimaryRelocation() throws Exception { | ||
final String oldPrimary = internalCluster().startNode(); | ||
createIndex(1); | ||
|
@@ -135,7 +125,6 @@ public void testPrimaryRelocation() throws Exception { | |
* failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the | ||
* replicas. | ||
*/ | ||
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") | ||
public void testPrimaryRelocationWithSegRepFailure() throws Exception { | ||
final String oldPrimary = internalCluster().startNode(); | ||
createIndex(1); | ||
|
@@ -220,7 +209,6 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { | |
* This test verifies primary recovery behavior with continuous ingestion | ||
* | ||
*/ | ||
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") | ||
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { | ||
final String primary = internalCluster().startNode(); | ||
createIndex(1); | ||
|
@@ -297,7 +285,6 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E | |
* operations during handoff. The test verifies all docs ingested are searchable on new primary. | ||
* | ||
*/ | ||
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") | ||
public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { | ||
final String primary = internalCluster().startNode(); | ||
createIndex(1); | ||
|
@@ -396,7 +383,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { | |
assertBusy(() -> { | ||
client().admin().indices().prepareRefresh().execute().actionGet(); | ||
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); | ||
}, 1, TimeUnit.MINUTES); | ||
}, 2, TimeUnit.MINUTES); | ||
flushAndRefresh(INDEX_NAME); | ||
waitForSearchableDocs(totalDocCount, replica, newPrimary); | ||
verifyStoreContent(); | ||
|
@@ -406,13 +393,10 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { | |
* This test verifies that adding a new node which results in peer recovery as replica; also bring replica's | ||
* replication checkpoint upto the primary's by performing a round of segment replication. | ||
*/ | ||
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") | ||
public void testNewlyAddedReplicaIsUpdated() throws Exception { | ||
final String primary = internalCluster().startNode(); | ||
prepareCreate( | ||
INDEX_NAME, | ||
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
).get(); | ||
prepareCreate(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)) | ||
.get(); | ||
for (int i = 0; i < 10; i++) { | ||
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); | ||
} | ||
|
@@ -430,10 +414,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception { | |
ensureGreen(INDEX_NAME); | ||
// Update replica count settings to 1 so that peer recovery triggers and recover replica | ||
assertAcked( | ||
client().admin() | ||
.indices() | ||
.prepareUpdateSettings(INDEX_NAME) | ||
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) | ||
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1)) | ||
); | ||
|
||
ClusterHealthResponse clusterHealthResponse = client().admin() | ||
|
@@ -454,18 +435,15 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception { | |
|
||
/** | ||
* This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. | ||
* | ||
* TODO: Ignoring this test as its flaky and needs separate fix | ||
*/ | ||
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") | ||
public void testAddNewReplicaFailure() throws Exception { | ||
logger.info("--> starting [Primary Node] ..."); | ||
final String primaryNode = internalCluster().startNode(); | ||
|
||
logger.info("--> creating test index ..."); | ||
prepareCreate( | ||
INDEX_NAME, | ||
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) | ||
|
||
).get(); | ||
|
||
|
@@ -505,10 +483,7 @@ public void testAddNewReplicaFailure() throws Exception { | |
ensureGreen(INDEX_NAME); | ||
// Add Replica shard to the new empty replica node | ||
assertAcked( | ||
client().admin() | ||
.indices() | ||
.prepareUpdateSettings(INDEX_NAME) | ||
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) | ||
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1)) | ||
); | ||
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica); | ||
waitForRecovery.await(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,12 +13,18 @@ | |
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.apache.lucene.store.AlreadyClosedException; | ||
import org.opensearch.ExceptionsHelper; | ||
import org.opensearch.OpenSearchException; | ||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.ActionListenerResponseHandler; | ||
import org.opensearch.action.support.ActionFilters; | ||
import org.opensearch.action.support.replication.ReplicationMode; | ||
import org.opensearch.action.support.replication.ReplicationOperation; | ||
import org.opensearch.action.support.replication.ReplicationResponse; | ||
import org.opensearch.action.support.replication.ReplicationTask; | ||
import org.opensearch.action.support.replication.TransportReplicationAction; | ||
import org.opensearch.cluster.action.shard.ShardStateAction; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.routing.ShardRouting; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.common.io.stream.StreamInput; | ||
|
@@ -33,15 +39,12 @@ | |
import org.opensearch.node.NodeClosedException; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.transport.TransportException; | ||
import org.opensearch.transport.TransportResponseHandler; | ||
import org.opensearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
import org.opensearch.action.support.replication.ReplicationMode; | ||
|
||
/** | ||
* Replication action responsible for publishing checkpoint to a replica shard. | ||
* | ||
|
@@ -107,36 +110,33 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { | |
* Publish checkpoint request to shard | ||
*/ | ||
final void publish(IndexShard indexShard) { | ||
String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); | ||
long primaryTerm = indexShard.getPendingPrimaryTerm(); | ||
final ThreadContext threadContext = threadPool.getThreadContext(); | ||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { | ||
// we have to execute under the system context so that if security is enabled the sync is authorized | ||
threadContext.markAsSystemContext(); | ||
PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); | ||
final ReplicationCheckpoint checkpoint = request.getCheckpoint(); | ||
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); | ||
final ReplicationTimer timer = new ReplicationTimer(); | ||
timer.start(); | ||
transportService.sendChildRequest( | ||
clusterService.localNode(), | ||
transportPrimaryAction, | ||
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), | ||
task, | ||
transportOptions, | ||
new TransportResponseHandler<ReplicationResponse>() { | ||
@Override | ||
public ReplicationResponse read(StreamInput in) throws IOException { | ||
return newResponseInstance(in); | ||
} | ||
|
||
@Override | ||
public String executor() { | ||
return ThreadPool.Names.SAME; | ||
} | ||
|
||
final List<ShardRouting> replicationTargets = indexShard.getReplicationGroup().getReplicationTargets(); | ||
for (ShardRouting replicationTarget : replicationTargets) { | ||
if (replicationTarget.primary()) { | ||
continue; | ||
} | ||
final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId()); | ||
final ConcreteReplicaRequest<PublishCheckpointRequest> replicaRequest = new ConcreteReplicaRequest<>( | ||
request, | ||
replicationTarget.allocationId().getId(), | ||
primaryTerm, | ||
indexShard.getLastKnownGlobalCheckpoint(), | ||
indexShard.getMaxSeqNoOfUpdatesOrDeletes() | ||
); | ||
final ReplicationTimer timer = new ReplicationTimer(); | ||
timer.start(); | ||
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); | ||
ActionListener<ReplicationOperation.ReplicaResponse> listener = new ActionListener<>() { | ||
@Override | ||
public void handleResponse(ReplicationResponse response) { | ||
public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) { | ||
timer.stop(); | ||
logger.trace( | ||
() -> new ParameterizedMessage( | ||
|
@@ -146,12 +146,11 @@ public void handleResponse(ReplicationResponse response) { | |
timer.time() | ||
) | ||
); | ||
task.setPhase("finished"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not intentional, thanks for catching this, it is useful in tracking state in the |
||
taskManager.unregister(task); | ||
} | ||
|
||
@Override | ||
public void handleException(TransportException e) { | ||
public void onFailure(Exception e) { | ||
timer.stop(); | ||
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time()); | ||
task.setPhase("finished"); | ||
|
@@ -174,8 +173,13 @@ public void handleException(TransportException e) { | |
e | ||
); | ||
} | ||
} | ||
); | ||
}; | ||
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>( | ||
listener, | ||
ReplicaResponse::new | ||
); | ||
transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler); | ||
} | ||
logger.trace( | ||
() -> new ParameterizedMessage( | ||
"[shardId {}] Publishing replication checkpoint [{}]", | ||
|
@@ -192,7 +196,7 @@ protected void shardOperationOnPrimary( | |
IndexShard primary, | ||
ActionListener<PrimaryResult<PublishCheckpointRequest, ReplicationResponse>> listener | ||
) { | ||
ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse())); | ||
throw new OpenSearchException("PublishCheckpointAction should not hit primary shards"); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍