Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust log and unmute testFailOverOnFollower #38762

Merged
merged 1 commit into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,8 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
try {
if (logger.isTraceEnabled()) {
// don't use index.source().utf8ToString() here source might not be valid UTF-8
logger.trace("index [{}][{}] (seq# [{}])", index.type(), index.id(), index.seqNo());
logger.trace("index [{}][{}] seq# [{}] allocation-id {}",
index.type(), index.id(), index.seqNo(), routingEntry().allocationId());
}
result = engine.index(index);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,8 +997,9 @@ public static List<DocIdSeqNoAndTerm> getDocIds(Engine engine, boolean refresh)
}
}
}
docs.sort(Comparator.comparing(DocIdSeqNoAndTerm::getId)
.thenComparingLong(DocIdSeqNoAndTerm::getSeqNo).thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm));
docs.sort(Comparator.comparingLong(DocIdSeqNoAndTerm::getSeqNo)
.thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)
.thenComparing((DocIdSeqNoAndTerm::getId)));
return docs;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -452,8 +453,18 @@ protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String f
logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
assertBusy(() -> {
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex);
logger.info("--> docs on the follower {}", docsOnFollower);
assertThat(docsOnFollower, equalTo(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex)));
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex);
Map<Integer, Set<DocIdSeqNoAndTerm>> mismatchedDocs = new HashMap<>();
for (Map.Entry<Integer, List<DocIdSeqNoAndTerm>> fe : docsOnFollower.entrySet()) {
Set<DocIdSeqNoAndTerm> d1 = Sets.difference(
Sets.newHashSet(fe.getValue()), Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())));
Set<DocIdSeqNoAndTerm> d2 = Sets.difference(
Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())), Sets.newHashSet(fe.getValue()));
if (d1.isEmpty() == false || d2.isEmpty() == false) {
mismatchedDocs.put(fe.getKey(), Sets.union(d1, d2));
}
}
assertThat("mismatched documents [" + mismatchedDocs + "]", docsOnFollower, equalTo(docsOnLeader));
}, 120, TimeUnit.SECONDS);

logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex);
Expand Down Expand Up @@ -482,13 +493,15 @@ private Map<Integer, List<DocIdSeqNoAndTerm>> getDocIdAndSeqNos(InternalTestClus
Randomness.shuffle(shardRoutings);
final Map<Integer, List<DocIdSeqNoAndTerm>> docs = new HashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting == null || shardRouting.assignedToNode() == false || docs.containsKey(shardRouting.shardId().id())) {
if (shardRouting == null || shardRouting.assignedToNode() == false) {
continue;
}
IndexShard indexShard = cluster.getInstance(IndicesService.class, state.nodes().get(shardRouting.currentNodeId()).getName())
.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id());
try {
docs.put(shardRouting.shardId().id(), IndexShardTestCase.getDocIdAndSeqNos(indexShard).stream()
final List<DocIdSeqNoAndTerm> docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard);
logger.info("--> shard {} docs {} seq_no_stats {}", shardRouting, docsOnShard, indexShard.seqNoStats());
docs.put(shardRouting.shardId().id(), docsOnShard.stream()
// normalize primary term as the follower use its own term
.map(d -> new DocIdSeqNoAndTerm(d.getId(), d.getSeqNo(), 1L))
.collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,24 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.index.shard:DEBUG")
@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.xpack.ccr.action.ShardChangesAction:DEBUG,"
+ "org.elasticsearch.index.shard:TRACE")
public class FollowerFailOverIT extends CcrIntegTestCase {

@Override
protected boolean reuseClusters() {
return false;
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/38633")
public void testFailOverOnFollower() throws Exception {
final String leaderIndex = "leader_test_failover";
final String followerIndex = "follower_test_failover";
int numberOfReplicas = between(1, 2);
getFollowerCluster().startMasterOnlyNode();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 8)];
AtomicInteger docID = new AtomicInteger();
Expand All @@ -77,20 +79,20 @@ public void testFailOverOnFollower() throws Exception {
}
if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update
IndexResponse indexResponse = leaderClient().prepareIndex("leader-index", "doc", id)
IndexResponse indexResponse = leaderClient().prepareIndex(leaderIndex, "doc", id)
.setSource("{\"f\":" + id + "}", XContentType.JSON).get();
logger.info("--> index id={} seq_no={}", indexResponse.getId(), indexResponse.getSeqNo());
logger.info("--> index {} id={} seq_no={}", leaderIndex, indexResponse.getId(), indexResponse.getSeqNo());
} else {
String id = Integer.toString(between(0, docID.get()));
DeleteResponse deleteResponse = leaderClient().prepareDelete("leader-index", "doc", id).get();
logger.info("--> delete id={} seq_no={}", deleteResponse.getId(), deleteResponse.getSeqNo());
DeleteResponse deleteResponse = leaderClient().prepareDelete(leaderIndex, "doc", id).get();
logger.info("--> delete {} id={} seq_no={}", leaderIndex, deleteResponse.getId(), deleteResponse.getSeqNo());
}
}
});
threads[i].start();
}
availableDocs.release(between(100, 200));
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
PutFollowAction.Request follow = putFollow(leaderIndex, followerIndex);
follow.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
follow.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
Expand All @@ -99,27 +101,27 @@ public void testFailOverOnFollower() throws Exception {
follow.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
logger.info("--> follow request {}", Strings.toString(follow));
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
disableDelayedAllocation("follower-index");
ensureFollowerGreen("follower-index");
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(30, 80));
disableDelayedAllocation(followerIndex);
ensureFollowerGreen(followerIndex);
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(30, 80));
final ClusterState clusterState = getFollowerCluster().clusterService().state();
for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) {
for (ShardRouting shardRouting : clusterState.routingTable().allShards(followerIndex)) {
if (shardRouting.primary()) {
DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
getFollowerCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
break;
}
}
availableDocs.release(between(50, 200));
ensureFollowerGreen("follower-index");
ensureFollowerGreen(followerIndex);
availableDocs.release(between(50, 200));
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(100, 150));
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(100, 150));
stopped.set(true);
for (Thread thread : threads) {
thread.join();
}
assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
pauseFollow("follower-index");
assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex);
pauseFollow(followerIndex);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337")
Expand Down