From 300a4f6d3e21dab5c213e7fd93803e3f2444982a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 11 Feb 2019 22:45:25 -0500 Subject: [PATCH] Adjust log and unmute testFailOverOnFollower --- .../elasticsearch/index/shard/IndexShard.java | 3 +- .../index/engine/EngineTestCase.java | 5 +-- .../elasticsearch/xpack/CcrIntegTestCase.java | 21 +++++++++--- .../xpack/ccr/FollowerFailOverIT.java | 34 ++++++++++--------- 4 files changed, 40 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 63307af0ac67c..726678fb9a6f8 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -778,7 +778,8 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc try { if (logger.isTraceEnabled()) { // don't use index.source().utf8ToString() here source might not be valid UTF-8 - logger.trace("index [{}][{}] (seq# [{}])", index.type(), index.id(), index.seqNo()); + logger.trace("index [{}][{}] seq# [{}] allocation-id {}", + index.type(), index.id(), index.seqNo(), routingEntry().allocationId()); } result = engine.index(index); } catch (Exception e) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 855f1b2e2fd72..005bfb42f8a22 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -997,8 +997,9 @@ public static List getDocIds(Engine engine, boolean refresh) } } } - docs.sort(Comparator.comparing(DocIdSeqNoAndTerm::getId) - .thenComparingLong(DocIdSeqNoAndTerm::getSeqNo).thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)); + docs.sort(Comparator.comparingLong(DocIdSeqNoAndTerm::getSeqNo) + .thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm) + .thenComparing((DocIdSeqNoAndTerm::getId))); return docs; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index e8c70a8fce4ba..d8c90e94c827b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; @@ -452,8 +453,18 @@ protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String f logger.info("--> asserting <> between {} and {}", leaderIndex, followerIndex); assertBusy(() -> { Map> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex); - logger.info("--> docs on the follower {}", docsOnFollower); - assertThat(docsOnFollower, equalTo(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex))); + Map> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex); + Map> mismatchedDocs = new HashMap<>(); + for (Map.Entry> fe : docsOnFollower.entrySet()) { + Set d1 = Sets.difference( + Sets.newHashSet(fe.getValue()), Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList()))); + Set d2 = Sets.difference( + Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())), Sets.newHashSet(fe.getValue())); + if (d1.isEmpty() == false || d2.isEmpty() == false) { + mismatchedDocs.put(fe.getKey(), Sets.union(d1, d2)); + } + } + assertThat("mismatched documents [" + mismatchedDocs + "]", docsOnFollower, equalTo(docsOnLeader)); }, 120, TimeUnit.SECONDS); logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex); @@ -482,13 +493,15 @@ private Map> getDocIdAndSeqNos(InternalTestClus Randomness.shuffle(shardRoutings); final Map> docs = new HashMap<>(); for (ShardRouting shardRouting : shardRoutings) { - if (shardRouting == null || shardRouting.assignedToNode() == false || docs.containsKey(shardRouting.shardId().id())) { + if (shardRouting == null || shardRouting.assignedToNode() == false) { continue; } IndexShard indexShard = cluster.getInstance(IndicesService.class, state.nodes().get(shardRouting.currentNodeId()).getName()) .indexServiceSafe(shardRouting.index()).getShard(shardRouting.id()); try { - docs.put(shardRouting.shardId().id(), IndexShardTestCase.getDocIdAndSeqNos(indexShard).stream() + final List docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard); + logger.info("--> shard {} docs {} seq_no_stats {}", shardRouting, docsOnShard, indexShard.seqNoStats()); + docs.put(shardRouting.shardId().id(), docsOnShard.stream() // normalize primary term as the follower use its own term .map(d -> new DocIdSeqNoAndTerm(d.getId(), d.getSeqNo(), 1L)) .collect(Collectors.toList())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index 2adbc36107ac4..d2df76e047c21 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -45,7 +45,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.index.shard:DEBUG") +@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.xpack.ccr.action.ShardChangesAction:DEBUG," + + "org.elasticsearch.index.shard:TRACE") public class FollowerFailOverIT extends CcrIntegTestCase { @Override @@ -53,14 +54,15 @@ protected boolean reuseClusters() { return false; } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/38633") public void testFailOverOnFollower() throws Exception { + final String leaderIndex = "leader_test_failover"; + final String followerIndex = "follower_test_failover"; int numberOfReplicas = between(1, 2); getFollowerCluster().startMasterOnlyNode(); getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2)); String leaderIndexSettings = getIndexSettings(1, numberOfReplicas, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); - assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON)); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); AtomicBoolean stopped = new AtomicBoolean(); Thread[] threads = new Thread[between(1, 8)]; AtomicInteger docID = new AtomicInteger(); @@ -77,20 +79,20 @@ public void testFailOverOnFollower() throws Exception { } if (frequently()) { String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update - IndexResponse indexResponse = leaderClient().prepareIndex("leader-index", "doc", id) + IndexResponse indexResponse = leaderClient().prepareIndex(leaderIndex, "doc", id) .setSource("{\"f\":" + id + "}", XContentType.JSON).get(); - logger.info("--> index id={} seq_no={}", indexResponse.getId(), indexResponse.getSeqNo()); + logger.info("--> index {} id={} seq_no={}", leaderIndex, indexResponse.getId(), indexResponse.getSeqNo()); } else { String id = Integer.toString(between(0, docID.get())); - DeleteResponse deleteResponse = leaderClient().prepareDelete("leader-index", "doc", id).get(); - logger.info("--> delete id={} seq_no={}", deleteResponse.getId(), deleteResponse.getSeqNo()); + DeleteResponse deleteResponse = leaderClient().prepareDelete(leaderIndex, "doc", id).get(); + logger.info("--> delete {} id={} seq_no={}", leaderIndex, deleteResponse.getId(), deleteResponse.getSeqNo()); } } }); threads[i].start(); } availableDocs.release(between(100, 200)); - PutFollowAction.Request follow = putFollow("leader-index", "follower-index"); + PutFollowAction.Request follow = putFollow(leaderIndex, followerIndex); follow.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048)); follow.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); follow.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10)); @@ -99,11 +101,11 @@ public void testFailOverOnFollower() throws Exception { follow.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); logger.info("--> follow request {}", Strings.toString(follow)); followerClient().execute(PutFollowAction.INSTANCE, follow).get(); - disableDelayedAllocation("follower-index"); - ensureFollowerGreen("follower-index"); - awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(30, 80)); + disableDelayedAllocation(followerIndex); + ensureFollowerGreen(followerIndex); + awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(30, 80)); final ClusterState clusterState = getFollowerCluster().clusterService().state(); - for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) { + for (ShardRouting shardRouting : clusterState.routingTable().allShards(followerIndex)) { if (shardRouting.primary()) { DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId()); getFollowerCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback()); @@ -111,15 +113,15 @@ public void testFailOverOnFollower() throws Exception { } } availableDocs.release(between(50, 200)); - ensureFollowerGreen("follower-index"); + ensureFollowerGreen(followerIndex); availableDocs.release(between(50, 200)); - awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(100, 150)); + awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(100, 150)); stopped.set(true); for (Thread thread : threads) { thread.join(); } - assertIndexFullyReplicatedToFollower("leader-index", "follower-index"); - pauseFollow("follower-index"); + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + pauseFollow(followerIndex); } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337")