Skip to content

Commit

Permalink
Add assert for OPEN_INDEX_CREATE_TRANSLOG
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 16, 2018
1 parent 64e692a commit a252d0b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,37 +412,38 @@ public void skipTranslogRecovery() {

private IndexCommit getStartingCommitPoint() throws IOException {
final IndexCommit startingIndexCommit;
final List<IndexCommit> existingCommits;
final List<IndexCommit> exisingComits;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
startingIndexCommit = null;
break;
case OPEN_INDEX_CREATE_TRANSLOG:
// Use the last commit
existingCommits = DirectoryReader.listCommits(store.directory());
startingIndexCommit = existingCommits.get(existingCommits.size() - 1);
exisingComits = DirectoryReader.listCommits(store.directory());
assert exisingComits.size() == 1 : "Expect OPEN_INDEX_CREATE_TRANSLOG has a single index commit; exisingComits [" + exisingComits + "]";
startingIndexCommit = exisingComits.get(exisingComits.size() - 1);
break;
case OPEN_INDEX_AND_TRANSLOG:
// Use the safe commit
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
existingCommits = DirectoryReader.listCommits(store.directory());
exisingComits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog are fully retained.
// To avoid this issue, we only select index exisingComits whose translog are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
for (IndexCommit commit : exisingComits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
"exisingComits [" + exisingComits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(exisingComits, lastSyncedGlobalCheckpoint);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4389,18 +4389,20 @@ public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception {

public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
IOUtils.close(engine);
final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get);
final Map<String, String> lastCommit;
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
engine.skipTranslogRecovery();
final int numDocs = between(5, 50);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
if (randomBoolean()) {
engine.flush();
}
}
globalCheckpoint.set(engine.getLocalCheckpointTracker().getMaxSeqNo());
engine.getTranslog().sync();
engine.flush();
final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
assertThat("Should keep one commit with in-sync global checkpoint", commits, hasSize(1));
lastCommit = commits.get(commits.size() - 1).getUserData();
}
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
Expand Down

0 comments on commit a252d0b

Please sign in to comment.