Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Fix flaky tests in SegmentReplicationPressureIT #6868

Merged
merged 1 commit into from
Mar 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.replication.SegmentReplicationBaseIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -52,12 +53,24 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

@Override
public Settings indexSettings() {
// we want to control refreshes
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.refresh_interval", -1)
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671")
public void testWritesRejected() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand All @@ -76,6 +89,10 @@ public void testWritesRejected() throws Exception {
indexingThread.start();
indexingThread.join();
latch.await();

indexDoc();
totalDocs.incrementAndGet();
refresh(INDEX_NAME);
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
Expand All @@ -90,6 +107,7 @@ public void testWritesRejected() throws Exception {

// index another doc showing there is no pressure enforced.
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}
Expand All @@ -98,7 +116,6 @@ public void testWritesRejected() throws Exception {
* This test ensures that a replica can be added while the index is under write block.
* Ensuring that only write requests are blocked.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671")
public void testAddReplicaWhileWritesBlocked() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand All @@ -118,6 +135,9 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
indexingThread.start();
indexingThread.join();
latch.await();
indexDoc();
totalDocs.incrementAndGet();
refresh(INDEX_NAME);
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
Expand All @@ -142,6 +162,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {

// index another doc showing there is no pressure enforced.
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}
Expand Down Expand Up @@ -258,7 +279,7 @@ private void assertFailedRequests(BulkResponse response) {
}

private void indexDoc() {
client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").get();
client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").execute().actionGet();
}

private void assertEqualSegmentInfosVersion(List<String> replicaNames, IndexShard primaryShard) {
Expand Down