From a252d0b51247ef878912729c5e435f0682e7f1fb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 15 Jan 2018 22:15:40 -0500 Subject: [PATCH] Add assert for OPEN_INDEX_CREATE_TRANSLOG --- .../index/engine/InternalEngine.java | 17 +++++++++-------- .../index/engine/InternalEngineTests.java | 10 ++++++---- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1efbd0706d156..201ba8f08fafa 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -412,37 +412,38 @@ public void skipTranslogRecovery() { private IndexCommit getStartingCommitPoint() throws IOException { final IndexCommit startingIndexCommit; - final List existingCommits; + final List exisingComits; switch (openMode) { case CREATE_INDEX_AND_TRANSLOG: startingIndexCommit = null; break; case OPEN_INDEX_CREATE_TRANSLOG: // Use the last commit - existingCommits = DirectoryReader.listCommits(store.directory()); - startingIndexCommit = existingCommits.get(existingCommits.size() - 1); + exisingComits = DirectoryReader.listCommits(store.directory()); + assert exisingComits.size() == 1 : "Expect OPEN_INDEX_CREATE_TRANSLOG has a single index commit; exisingComits [" + exisingComits + "]"; + startingIndexCommit = exisingComits.get(exisingComits.size() - 1); break; case OPEN_INDEX_AND_TRANSLOG: // Use the safe commit final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); final long minRetainedTranslogGen = translog.getMinFileGeneration(); - existingCommits = DirectoryReader.listCommits(store.directory()); + exisingComits = DirectoryReader.listCommits(store.directory()); // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. - // To avoid this issue, we only select index commits whose translog are fully retained. + // To avoid this issue, we only select index exisingComits whose translog are fully retained. if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { final List recoverableCommits = new ArrayList<>(); - for (IndexCommit commit : existingCommits) { + for (IndexCommit commit : exisingComits) { if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { recoverableCommits.add(commit); } } assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + - "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; + "exisingComits [" + exisingComits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); } else { // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. - startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(exisingComits, lastSyncedGlobalCheckpoint); } break; default: diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index db62db7e01b46..773435335594b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4389,18 +4389,20 @@ public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception { public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception { IOUtils.close(engine); - final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get); final Map lastCommit; try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { engine.skipTranslogRecovery(); final int numDocs = between(5, 50); for (int i = 0; i < numDocs; i++) { index(engine, i); - if (randomBoolean()) { - engine.flush(); - } } + globalCheckpoint.set(engine.getLocalCheckpointTracker().getMaxSeqNo()); + engine.getTranslog().sync(); + engine.flush(); final List commits = DirectoryReader.listCommits(engine.store.directory()); + assertThat("Should keep one commit with in-sync global checkpoint", commits, hasSize(1)); lastCommit = commits.get(commits.size() - 1).getUserData(); } try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {