Skip to content

Commit

Permalink
Adjust log and unmute testFailOverOnFollower (#38762)
Browse files Browse the repository at this point in the history
There were two documents (seq=2 and seq=103) missing on the follower in
one of the failures of `testFailOverOnFollower`. I spent several hours
on that failure but could not figure out the reason. I adjust log and
unmute this test so we can collect more information.

Relates #38633
  • Loading branch information
dnhatn committed Feb 23, 2019
1 parent 22e27c3 commit 7b6431f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
try {
if (logger.isTraceEnabled()) {
// don't use index.source().utf8ToString() here source might not be valid UTF-8
logger.trace("index [{}][{}] (seq# [{}])", index.type(), index.id(), index.seqNo());
logger.trace("index [{}][{}] seq# [{}] allocation-id {}",
index.type(), index.id(), index.seqNo(), routingEntry().allocationId());
}
result = engine.index(index);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,9 @@ public static List<DocIdSeqNoAndTerm> getDocIds(Engine engine, boolean refresh)
}
}
}
docs.sort(Comparator.comparing(DocIdSeqNoAndTerm::getId)
.thenComparingLong(DocIdSeqNoAndTerm::getSeqNo).thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm));
docs.sort(Comparator.comparingLong(DocIdSeqNoAndTerm::getSeqNo)
.thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)
.thenComparing((DocIdSeqNoAndTerm::getId)));
return docs;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -477,8 +478,18 @@ protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String f
logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
assertBusy(() -> {
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex);
logger.info("--> docs on the follower {}", docsOnFollower);
assertThat(docsOnFollower, equalTo(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex)));
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex);
Map<Integer, Set<DocIdSeqNoAndTerm>> mismatchedDocs = new HashMap<>();
for (Map.Entry<Integer, List<DocIdSeqNoAndTerm>> fe : docsOnFollower.entrySet()) {
Set<DocIdSeqNoAndTerm> d1 = Sets.difference(
Sets.newHashSet(fe.getValue()), Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())));
Set<DocIdSeqNoAndTerm> d2 = Sets.difference(
Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())), Sets.newHashSet(fe.getValue()));
if (d1.isEmpty() == false || d2.isEmpty() == false) {
mismatchedDocs.put(fe.getKey(), Sets.union(d1, d2));
}
}
assertThat("mismatched documents [" + mismatchedDocs + "]", docsOnFollower, equalTo(docsOnLeader));
}, 120, TimeUnit.SECONDS);

logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex);
Expand Down Expand Up @@ -507,13 +518,15 @@ private Map<Integer, List<DocIdSeqNoAndTerm>> getDocIdAndSeqNos(InternalTestClus
Randomness.shuffle(shardRoutings);
final Map<Integer, List<DocIdSeqNoAndTerm>> docs = new HashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting == null || shardRouting.assignedToNode() == false || docs.containsKey(shardRouting.shardId().id())) {
if (shardRouting == null || shardRouting.assignedToNode() == false) {
continue;
}
IndexShard indexShard = cluster.getInstance(IndicesService.class, state.nodes().get(shardRouting.currentNodeId()).getName())
.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id());
try {
docs.put(shardRouting.shardId().id(), IndexShardTestCase.getDocIdAndSeqNos(indexShard).stream()
final List<DocIdSeqNoAndTerm> docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard);
logger.info("--> shard {} docs {} seq_no_stats {}", shardRouting, docsOnShard, indexShard.seqNoStats());
docs.put(shardRouting.shardId().id(), docsOnShard.stream()
// normalize primary term as the follower use its own term
.map(d -> new DocIdSeqNoAndTerm(d.getId(), d.getSeqNo(), 1L))
.collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,24 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.index.shard:DEBUG")
@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.xpack.ccr.action.ShardChangesAction:DEBUG,"
+ "org.elasticsearch.index.shard:TRACE")
public class FollowerFailOverIT extends CcrIntegTestCase {

@Override
protected boolean reuseClusters() {
return false;
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38633")
public void testFailOverOnFollower() throws Exception {
final String leaderIndex = "leader_test_failover";
final String followerIndex = "follower_test_failover";
int numberOfReplicas = between(1, 2);
getFollowerCluster().startMasterOnlyNode();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 8)];
AtomicInteger docID = new AtomicInteger();
Expand All @@ -77,20 +79,20 @@ public void testFailOverOnFollower() throws Exception {
}
if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update
IndexResponse indexResponse = leaderClient().prepareIndex("leader-index", "doc", id)
IndexResponse indexResponse = leaderClient().prepareIndex(leaderIndex, "doc", id)
.setSource("{\"f\":" + id + "}", XContentType.JSON).get();
logger.info("--> index id={} seq_no={}", indexResponse.getId(), indexResponse.getSeqNo());
logger.info("--> index {} id={} seq_no={}", leaderIndex, indexResponse.getId(), indexResponse.getSeqNo());
} else {
String id = Integer.toString(between(0, docID.get()));
DeleteResponse deleteResponse = leaderClient().prepareDelete("leader-index", "doc", id).get();
logger.info("--> delete id={} seq_no={}", deleteResponse.getId(), deleteResponse.getSeqNo());
DeleteResponse deleteResponse = leaderClient().prepareDelete(leaderIndex, "doc", id).get();
logger.info("--> delete {} id={} seq_no={}", leaderIndex, deleteResponse.getId(), deleteResponse.getSeqNo());
}
}
});
threads[i].start();
}
availableDocs.release(between(100, 200));
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
PutFollowAction.Request follow = putFollow(leaderIndex, followerIndex);
follow.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
follow.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
Expand All @@ -99,27 +101,27 @@ public void testFailOverOnFollower() throws Exception {
follow.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
logger.info("--> follow request {}", Strings.toString(follow));
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
disableDelayedAllocation("follower-index");
ensureFollowerGreen("follower-index");
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(30, 80));
disableDelayedAllocation(followerIndex);
ensureFollowerGreen(followerIndex);
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(30, 80));
final ClusterState clusterState = getFollowerCluster().clusterService().state();
for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) {
for (ShardRouting shardRouting : clusterState.routingTable().allShards(followerIndex)) {
if (shardRouting.primary()) {
DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
getFollowerCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
break;
}
}
availableDocs.release(between(50, 200));
ensureFollowerGreen("follower-index");
ensureFollowerGreen(followerIndex);
availableDocs.release(between(50, 200));
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(100, 150));
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(100, 150));
stopped.set(true);
for (Thread thread : threads) {
thread.join();
}
assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
pauseFollow("follower-index");
assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex);
pauseFollow(followerIndex);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337")
Expand Down

0 comments on commit 7b6431f

Please sign in to comment.