Skip to content

Commit

Permalink
Keep one commit whose max_seqno is no_ops_performed
Browse files Browse the repository at this point in the history
If a 6.x node with a 5.x index is promoted to be a primary, it will flush a new
index commit to make sure translog operations without seqno will never be
replayed (see IndexShard#updateShardState). However the global checkpoint is
still UNASSIGNED and the max_seqno of both commits are NO_OPS_PERFORMED. If the
combined deletion policy considers the first commit as a safe commit, we will
send the first commit without replaying translog between these commits to the
replica in a peer-recovery. This causes the replica missing those operations.
To prevent this, we should not keep more than one commit whose max_seqno is
NO_OPS_PERFORMED. Once we can retain a safe commit, a NO_OPS_PERFORMED commit
will be deleted just as other commits.

Relates #28038
  • Loading branch information
dnhatn committed Jan 12, 2018
1 parent 0c0dc3c commit ba770b5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
return Math.min(commits.size() - 1, i + 1);
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
// If a 6.x node with a 5.x index is promoted to be a primary, it will flush a new index commit to
// make sure translog operations without seqno will never be replayed (see IndexShard#updateShardState).
// However the global checkpoint is still UNASSIGNED and the max_seqno of both commits are NO_OPS_PERFORMED.
// If this policy considers the first commit as a safe commit, we will send the first commit without replaying
// translog between these commits to the replica in a peer-recovery. This causes the replica missing those operations.
// To prevent this, we should not keep more than one commit whose max_seqno is NO_OPS_PERFORMED.
// Once we can retain a safe commit, a NO_OPS_PERFORMED commit will be deleted just as other commits.
if (maxSeqNoFromCommit == SequenceNumbers.NO_OPS_PERFORMED) {
return i;
}
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -149,6 +150,54 @@ public void testLegacyIndex() throws Exception {
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
}

public void testKeepSingleNoOpsCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomLong());
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);

final List<IndexCommit> commitList = new ArrayList<>();
final int numOfNoOpsCommits = between(1, 10);
long lastNoopTranslogGen = 0;
for (int i = 0; i < numOfNoOpsCommits; i++) {
lastNoopTranslogGen += between(1, 20);
commitList.add(mockIndexCommit(SequenceNumbers.NO_OPS_PERFORMED, translogUUID, lastNoopTranslogGen));
}
// Keep only one no_ops commit.
indexPolicy.onCommit(commitList);
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastNoopTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastNoopTranslogGen));
for (int i = 0; i < numOfNoOpsCommits - 1; i++) {
verify(commitList.get(i), times(1)).delete();
}
verify(commitList.get(commitList.size() - 1), never()).delete();
// Add a some good commits.
final int numOfGoodCommits = between(1, 5);
long maxSeqNo = 0;
long lastTranslogGen = lastNoopTranslogGen;
for (int i = 0; i < numOfGoodCommits; i++) {
maxSeqNo += between(1, 1000);
lastTranslogGen += between(1, 20);
commitList.add(mockIndexCommit(maxSeqNo, translogUUID, lastTranslogGen));
}
// If the global checkpoint is still unassigned, we should still keep one NO_OPS_PERFORMED commit.
globalCheckpoint.set(SequenceNumbers.UNASSIGNED_SEQ_NO);
indexPolicy.onCommit(commitList);
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastNoopTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
for (int i = 0; i < numOfNoOpsCommits - 1; i++) {
verify(commitList.get(i), times(2)).delete();
}
verify(commitList.get(numOfNoOpsCommits - 1), never()).delete();
// Delete no-ops commit if global checkpoint advanced enough.
final long lower = Long.parseLong(commitList.get(numOfNoOpsCommits).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
globalCheckpoint.set(randomLongBetween(lower, Long.MAX_VALUE));
indexPolicy.onCommit(commitList);
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), greaterThan(lastNoopTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
verify(commitList.get(numOfNoOpsCommits - 1), times(1)).delete();
}

public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
Expand Down

0 comments on commit ba770b5

Please sign in to comment.