Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trim local translog in peer recovery #44756

Merged
merged 15 commits into from
Aug 3, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
ActionListener.completeWith(listener, () -> {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
indexShard.trimOperationOfPreviousPrimaryTerms(indexShard.getLastSyncedGlobalCheckpoint());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@ public void onFailure(Exception e) {
onFailureException.get(), hasToString(containsString("operation primary term [" + oldPrimaryTerm + "] is too old")));
}

closeShards(indexShard);
closeShard(indexShard, false); // skip asserting translog and Lucene as we rolled back Lucene but did not execute resync
}

public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.UUIDs;
Expand All @@ -37,6 +38,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
Expand Down Expand Up @@ -405,4 +407,40 @@ public void testShouldFlushAfterPeerRecovery() throws Exception {
shards.assertAllEqual(numDocs);
}
}

public void testTrimTranslogInPeerRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(between(1, 2))) {
shards.startAll();
IndexShard oldPrimary = shards.getPrimary();
shards.indexDocs(scaledRandomIntBetween(1, 100));
if (randomBoolean()) {
shards.flush();
}
int inflightDocs = scaledRandomIntBetween(1, 100);
for (int i = 0; i < inflightDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i).source("{}", XContentType.JSON);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
for (IndexShard replica : randomSubsetOf(shards.getReplicas())) {
indexOnReplica(bulkShardRequest, shards, replica);
}
if (rarely()) {
shards.flush();
}
}
shards.syncGlobalCheckpoint();
shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get();
oldPrimary.close("demoted", false);
oldPrimary.store().close();
oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId());
shards.recoverReplica(oldPrimary);
final List<DocIdSeqNoAndSource> docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary());
for (IndexShard shard : shards.getReplicas()) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
}
shards.promoteReplicaToPrimary(oldPrimary).get();
for (IndexShard shard : shards) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1053,8 +1052,7 @@ public static List<DocIdSeqNoAndSource> getDocIds(Engine engine, boolean refresh
*/
public static List<Translog.Operation> readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException {
final List<Translog.Operation> operations = new ArrayList<>();
long maxSeqNo = Math.max(0, ((InternalEngine)engine).getLocalCheckpointTracker().getMaxSeqNo());
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, maxSeqNo, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) {
Translog.Operation op;
while ((op = snapshot.next()) != null){
operations.add(op);
Expand All @@ -1071,15 +1069,11 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
|| (engine instanceof InternalEngine) == false) {
return;
}
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();
if (maxSeqNo < 0) {
return; // nothing to check
}
final Map<Long, Translog.Operation> translogOps = new HashMap<>();
final List<Translog.Operation> translogOps = new ArrayList<>();
try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
translogOps.put(op.seqNo(), op);
translogOps.add(op);
}
}
final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper).stream()
Expand All @@ -1091,10 +1085,11 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
}
final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps);
for (Translog.Operation translogOp : translogOps.values()) {
for (Translog.Operation translogOp : translogOps) {
final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo());
if (luceneOp == null) {
if (minSeqNoToRetain <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) {
if (minSeqNoToRetain <= translogOp.seqNo()) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();
fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " +
"retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard repl
/**
* indexes the given requests on the supplied primary, modifying it for replicas
*/
BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
public BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
return executeReplicationRequestOnPrimary(primary, request);
}

Expand All @@ -826,7 +826,7 @@ BulkShardRequest deleteOnPrimary(DeleteRequest request, IndexShard primary) thro
/**
* indexes the given requests on the supplied replica shard
*/
void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
public void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
indexOnReplica(request, group, replica, group.primary.getPendingPrimaryTerm());
}

Expand Down