From d7f6ef9f3627d1f075b173dd86efc59c09a8a743 Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Sun, 14 Jan 2018 21:16:16 -0500
Subject: [PATCH 1/9] Deletion policy should keep only starting commit on init

The current implementation of the combined deletion policy causes the following issues.

# Replica can use an unsafe commit for sequence-based recovery
1. A replica has two commits c1 {max_seqno=1} and c2 {max_seqno=2} with op2 is a stale operation
2. A primary has one commit c1 {max_seqno=1} and the global checkpoint is 1
3. A replica recovers from the primary with c1 as a starting commit
4. Indexes one document to the cluster; the global checkpoint advances to 2
5. Restart the replica
6. The replica will mistakenly use c2 (as max_seqno <= gcp) as a starting commit even it contains stale ops

# Translog generations can go backwards in sequence-based recovery
1. A replica has two commits c1 {local_checkpoint=1, translog_gen=1} and c2 {local_checkpoint=2, translog_gen=2}
2. A primary has one commit c1 {local_checkpoint=1} and the global checkpoint is 1
3. A replica recovers from the primary with c1 as a starting commit - c2 is kept as the last commit
4. Flush on a replica will throw exception as a new commit has `translog-gen=1` which is smaller the translog-gen of c2

# Snapshotted commit without translog can be considered a safe commit
1. An index was created before 6.2 has two commits: c1 (snapshotted without translog), c2 unsafe commit
2. We will select c2 as a starting commit when opening an engine as we make sure to select only commits with translog
3. Flush a new commit c3, the policy will consider c1 as a safe commit even it does not have translog.

These issues can be avoided if the combined deletion policy keeps only the starting commit onInit.
---
 .../index/engine/CombinedDeletionPolicy.java  | 18 +++++++-
 .../index/engine/InternalEngine.java          |  3 +-
 .../engine/CombinedDeletionPolicyTests.java   | 39 ++++++++++++++--
 .../index/engine/InternalEngineTests.java     |  9 +++-
 .../RecoveryDuringReplicationTests.java       | 46 ++++++++++++++++++-
 5 files changed, 104 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
index e5d8cacf73657..b52e539c4bda2 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
@@ -45,15 +45,17 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
     private final TranslogDeletionPolicy translogDeletionPolicy;
     private final EngineConfig.OpenMode openMode;
     private final LongSupplier globalCheckpointSupplier;
+    private final IndexCommit startingCommit;
     private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
     private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
     private IndexCommit lastCommit; // the most recent commit point
 
     CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
-                           LongSupplier globalCheckpointSupplier) {
+                           LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
         this.openMode = openMode;
         this.translogDeletionPolicy = translogDeletionPolicy;
         this.globalCheckpointSupplier = globalCheckpointSupplier;
+        this.startingCommit = startingCommit;
         this.snapshottedCommits = new ObjectIntHashMap<>();
     }
 
@@ -61,21 +63,33 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
     public void onInit(List<? extends IndexCommit> commits) throws IOException {
         switch (openMode) {
             case CREATE_INDEX_AND_TRANSLOG:
+                assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
                 break;
             case OPEN_INDEX_CREATE_TRANSLOG:
                 assert commits.isEmpty() == false : "index is opened, but we have no commits";
+                assert startingCommit == null : "OPEN_INDEX_CREATE_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
                 // When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
                 // We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
                 break;
             case OPEN_INDEX_AND_TRANSLOG:
                 assert commits.isEmpty() == false : "index is opened, but we have no commits";
-                onCommit(commits);
+                assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+                    + "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
+                keepOnlyStartingCommitOnInit(commits);
                 break;
             default:
                 throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
         }
     }
 
+    private synchronized void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException {
+        commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete);
+        assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
+        lastCommit = startingCommit;
+        safeCommit = startingCommit;
+        updateTranslogDeletionPolicy();
+    }
+
     @Override
     public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
         final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 1b7b891efd6ff..441a2010540e2 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -185,7 +185,7 @@ public InternalEngine(EngineConfig engineConfig) {
                     "Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
                 this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
                 this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
-                    translog::getLastSyncedGlobalCheckpoint);
+                    translog::getLastSyncedGlobalCheckpoint, startingCommit);
                 writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
                 updateMaxUnsafeAutoIdTimestampFromWriter(writer);
                 assert engineConfig.getForceNewHistoryUUID() == false
@@ -429,6 +429,7 @@ private IndexCommit getStartingCommitPoint() throws IOException {
                     "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
                 return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
             } else {
+                // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
                 return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
             }
         }
diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
index e74cde52aa418..8a85040323f34 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
@@ -54,7 +54,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
     public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
         final AtomicLong globalCheckpoint = new AtomicLong();
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
-        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
+            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
 
         final LongArrayList maxSeqNoList = new LongArrayList();
         final LongArrayList translogGenList = new LongArrayList();
@@ -93,7 +94,8 @@ public void testAcquireIndexCommit() throws Exception {
         final AtomicLong globalCheckpoint = new AtomicLong();
         final UUID translogUUID = UUID.randomUUID();
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
-        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
+            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
         long lastMaxSeqNo = between(1, 1000);
         long lastTranslogGen = between(1, 20);
         int safeIndex = 0;
@@ -156,11 +158,12 @@ public void testLegacyIndex() throws Exception {
         final UUID translogUUID = UUID.randomUUID();
 
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
-        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
+            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
 
         long legacyTranslogGen = randomNonNegativeLong();
         IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
-        indexPolicy.onInit(singletonList(legacyCommit));
+        indexPolicy.onCommit(singletonList(legacyCommit));
         verify(legacyCommit, never()).delete();
         assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
         assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen));
@@ -188,7 +191,8 @@ public void testLegacyIndex() throws Exception {
     public void testDeleteInvalidCommits() throws Exception {
         final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
-        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get);
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
+            OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
 
         final int invalidCommits = between(1, 10);
         final List<IndexCommit> commitList = new ArrayList<>();
@@ -211,6 +215,31 @@ public void testDeleteInvalidCommits() throws Exception {
         }
     }
 
+    public void testKeepOnlyStartingCommitOnInit() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
+        TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
+        final UUID translogUUID = UUID.randomUUID();
+        final List<IndexCommit> commitList = new ArrayList<>();
+        int totalCommits = between(2, 20);
+        for (int i = 0; i < totalCommits; i++) {
+            commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong()));
+        }
+        final IndexCommit startingCommit = randomFrom(commitList);
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
+            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit);
+        indexPolicy.onInit(commitList);
+        for (IndexCommit commit : commitList) {
+            if (commit.equals(startingCommit) == false) {
+                verify(commit, times(1)).delete();
+            }
+        }
+        verify(startingCommit, never()).delete();
+        assertThat(translogPolicy.getMinTranslogGenerationForRecovery(),
+            equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
+        assertThat(translogPolicy.getTranslogGenerationOfLastCommit(),
+            equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
+    }
+
     IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
         final Map<String, String> userData = new HashMap<>();
         userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 518411e59e8cd..127b8a24f1364 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -4010,13 +4010,15 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
 
 
         boolean flushed = false;
+        AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
         Engine recoveringEngine = null;
         try {
             assertEquals(docs - 1, engine.getLocalCheckpointTracker().getMaxSeqNo());
             assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint());
             assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo());
             assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint());
-            recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
+            recoveringEngine = new InternalEngine(copy(
+                replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
             assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations());
             recoveringEngine.recoverFromTranslog();
             assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
@@ -4038,6 +4040,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
                 assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
                 assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
                 if ((flushed = randomBoolean())) {
+                    globalCheckpoint.set(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
+                    recoveringEngine.getTranslog().sync();
                     recoveringEngine.flush(true, true);
                 }
             }
@@ -4047,7 +4051,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
 
         // now do it again to make sure we preserve values etc.
         try {
-            recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
+            recoveringEngine = new InternalEngine(
+                copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
             if (flushed) {
                 assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations());
             }
diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
index aa97c2049915f..cd948ed9f9036 100644
--- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
+++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
@@ -304,8 +304,52 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
             replica.store().close();
             newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
             shards.recoverReplica(newReplica);
-
             shards.assertAllEqual(totalDocs);
+            // Make sure that flushing on a recovering shard is ok.
+            shards.flush();
+            shards.assertAllEqual(totalDocs);
+        }
+    }
+
+    public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception {
+        try (ReplicationGroup shards = createGroup(2)) {
+            shards.startAll();
+            IndexShard oldPrimary = shards.getPrimary();
+            IndexShard newPrimary = shards.getReplicas().get(0);
+            IndexShard replica = shards.getReplicas().get(1);
+            int goodDocs = shards.indexDocs(scaledRandomIntBetween(1, 20));
+            shards.flush();
+            // simulate docs that were inflight when primary failed, these will be rolled back
+            int staleDocs = scaledRandomIntBetween(1, 10);
+            logger.info("--> indexing {} stale docs", staleDocs);
+            for (int i = 0; i < staleDocs; i++) {
+                final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "stale_" + i)
+                    .source("{}", XContentType.JSON);
+                final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
+                indexOnReplica(bulkShardRequest, replica);
+            }
+            shards.flush();
+            shards.promoteReplicaToPrimary(newPrimary).get();
+            // Recover a replica should rollback the stale documents
+            shards.removeReplica(replica);
+            replica.close("recover replica - first time", false);
+            replica.store().close();
+            replica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
+            shards.recoverReplica(replica);
+            shards.assertAllEqual(goodDocs);
+            // Index more docs - move the global checkpoint >= seqno of the stale operations.
+            goodDocs += shards.indexDocs(scaledRandomIntBetween(staleDocs, staleDocs * 5));
+            shards.syncGlobalCheckpoint();
+            assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo()));
+            // Recover a replica again should also rollback the stale documents.
+            shards.removeReplica(replica);
+            replica.close("recover replica - second time", false);
+            replica.store().close();
+            IndexShard anotherReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
+            shards.recoverReplica(anotherReplica);
+            shards.assertAllEqual(goodDocs);
+            shards.flush();
+            shards.assertAllEqual(goodDocs);
         }
     }
 

From f157ed58c0f72466198dfee9906c24fb52a8a2da Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Mon, 15 Jan 2018 16:41:07 -0500
Subject: [PATCH 2/9] Add comment

---
 .../index/engine/CombinedDeletionPolicy.java  | 22 +++++++++++++++++++
 .../engine/CombinedDeletionPolicyTests.java   |  4 ++++
 2 files changed, 26 insertions(+)

diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
index b52e539c4bda2..693270875d35d 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
@@ -82,6 +82,28 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
         }
     }
 
+    /**
+     * Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe
+     * at the recovering time but they can suddenly become safe in the future.
+     * The following issues can happen if unsafe commits are kept oninit.
+     * <p>
+     * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
+     * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
+     * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
+     * the unsafe commit c2(max_seqno=2 <= gcp=2) as the starting commit for sequenced-based recovery even the commit c2
+     * contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
+     * <p>
+     * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
+     * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).
+     * The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
+     * commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
+     * translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
+     * while the local checkpoint of c2 is 2.
+     * <p>
+     * 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
+     * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
+     * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
+     */
     private synchronized void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException {
         commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete);
         assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
index 8a85040323f34..ca6059dae0067 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
@@ -215,6 +215,10 @@ public void testDeleteInvalidCommits() throws Exception {
         }
     }
 
+    /**
+     * Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time
+     * but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)}
+     */
     public void testKeepOnlyStartingCommitOnInit() throws Exception {
         final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();

From 11daf0959462715aa1d7018d565e58290bb2928d Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Mon, 15 Jan 2018 16:53:25 -0500
Subject: [PATCH 3/9] Use starting commit for OPEN_INDEX_CREATE_TRANSLOG

---
 .../index/engine/CombinedDeletionPolicy.java  | 11 ++--
 .../index/engine/InternalEngine.java          | 59 +++++++++++--------
 .../org/elasticsearch/index/store/Store.java  | 10 +---
 3 files changed, 42 insertions(+), 38 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
index 693270875d35d..d2f7959a2d689 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
@@ -66,11 +66,6 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
                 assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
                 break;
             case OPEN_INDEX_CREATE_TRANSLOG:
-                assert commits.isEmpty() == false : "index is opened, but we have no commits";
-                assert startingCommit == null : "OPEN_INDEX_CREATE_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
-                // When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
-                // We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
-                break;
             case OPEN_INDEX_AND_TRANSLOG:
                 assert commits.isEmpty() == false : "index is opened, but we have no commits";
                 assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
@@ -109,7 +104,11 @@ private synchronized void keepOnlyStartingCommitOnInit(List<? extends IndexCommi
         assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
         lastCommit = startingCommit;
         safeCommit = startingCommit;
-        updateTranslogDeletionPolicy();
+        // OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
+        // We therefore should not use that index commit to update the translog deletion policy.
+        if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
+            updateTranslogDeletionPolicy();
+        }
     }
 
     @Override
diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 441a2010540e2..1efbd0706d156 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -411,29 +411,44 @@ public void skipTranslogRecovery() {
     }
 
     private IndexCommit getStartingCommitPoint() throws IOException {
-        if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
-            final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
-            final long minRetainedTranslogGen = translog.getMinFileGeneration();
-            final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
-            // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog
-            // files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
-            // To avoid this issue, we only select index commits whose translog files are fully retained.
-            if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
-                final List<IndexCommit> recoverableCommits = new ArrayList<>();
-                for (IndexCommit commit : existingCommits) {
-                    if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
-                        recoverableCommits.add(commit);
+        final IndexCommit startingIndexCommit;
+        final List<IndexCommit> existingCommits;
+        switch (openMode) {
+            case CREATE_INDEX_AND_TRANSLOG:
+                startingIndexCommit = null;
+                break;
+            case OPEN_INDEX_CREATE_TRANSLOG:
+                // Use the last commit
+                existingCommits = DirectoryReader.listCommits(store.directory());
+                startingIndexCommit = existingCommits.get(existingCommits.size() - 1);
+                break;
+            case OPEN_INDEX_AND_TRANSLOG:
+                // Use the safe commit
+                final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
+                final long minRetainedTranslogGen = translog.getMinFileGeneration();
+                existingCommits = DirectoryReader.listCommits(store.directory());
+                // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
+                // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
+                // To avoid this issue, we only select index commits whose translog are fully retained.
+                if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
+                    final List<IndexCommit> recoverableCommits = new ArrayList<>();
+                    for (IndexCommit commit : existingCommits) {
+                        if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
+                            recoverableCommits.add(commit);
+                        }
                     }
+                    assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
+                        "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
+                    startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
+                } else {
+                    // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
+                    startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
                 }
-                assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " +
-                    "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
-                return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
-            } else {
-                // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
-                return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
-            }
+                break;
+            default:
+                throw new IllegalArgumentException("unknown mode: " + openMode);
         }
-        return null;
+        return startingIndexCommit;
     }
 
     private void recoverFromTranslogInternal() throws IOException {
@@ -558,9 +573,7 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear
                 final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
                 internalSearcherManager = new SearcherManager(directoryReader,
                         new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
-                // The index commit from IndexWriterConfig is null if the engine is open with other modes
-                // rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit.
-                lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit());
+                lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
                 ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
                     externalSearcherFactory);
                 success = true;
diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java
index 74be98b813238..7aab2c750d139 100644
--- a/server/src/main/java/org/elasticsearch/index/store/Store.java
+++ b/server/src/main/java/org/elasticsearch/index/store/Store.java
@@ -182,17 +182,9 @@ public Directory directory() {
      * @throws IOException if the index is corrupted or the segments file is not present
      */
     public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
-        return readCommittedSegmentsInfo(null);
-    }
-
-    /**
-     * Returns the committed segments info for the given commit point.
-     * If the commit point is not provided, this method will return the segments info of the last commit in the store.
-     */
-    public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException {
         failIfCorrupted();
         try {
-            return readSegmentsInfo(commit, directory());
+            return readSegmentsInfo(null, directory());
         } catch (CorruptIndexException ex) {
             markStoreCorrupted(ex);
             throw ex;

From 5c2853db7bb518cbdf391671199be9d345576881 Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Mon, 15 Jan 2018 17:48:20 -0500
Subject: [PATCH 4/9] Add engine tests

---
 .../index/engine/InternalEngineTests.java     | 54 +++++++++++++++++++
 1 file changed, 54 insertions(+)

diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 127b8a24f1364..db62db7e01b46 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -163,6 +163,7 @@
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
 import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThan;
@@ -4360,4 +4361,57 @@ public void testAcquireIndexCommit() throws Exception {
             assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));
         }
     }
+
+    public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception {
+        IOUtils.close(engine);
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
+        final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get);
+        final IndexCommit safeCommit;
+        try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
+            final int numDocs = between(5, 50);
+            for (int i = 0; i < numDocs; i++) {
+                index(engine, i);
+                if (randomBoolean()) {
+                    engine.flush();
+                }
+            }
+            // Selects a starting commit and advances and persists the global checkpoint to that commit.
+            final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
+            safeCommit = randomFrom(commits);
+            globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
+            engine.getTranslog().sync();
+        }
+        try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
+            final List<IndexCommit> existingCommits = DirectoryReader.listCommits(engine.store.directory());
+            assertThat("OPEN_INDEX_AND_TRANSLOG should keep only safe commit", existingCommits, contains(safeCommit));
+        }
+    }
+
+    public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
+        IOUtils.close(engine);
+        final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
+        final Map<String, String> lastCommit;
+        try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
+            engine.skipTranslogRecovery();
+            final int numDocs = between(5, 50);
+            for (int i = 0; i < numDocs; i++) {
+                index(engine, i);
+                if (randomBoolean()) {
+                    engine.flush();
+                }
+            }
+            final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
+            lastCommit = commits.get(commits.size() - 1).getUserData();
+        }
+        try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
+            final List<IndexCommit> existingCommits = DirectoryReader.listCommits(engine.store.directory());
+            assertThat("OPEN_INDEX_CREATE_TRANSLOG should keep only last commit", existingCommits, hasSize(1));
+            final Map<String, String> userData = existingCommits.get(0).getUserData();
+            assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(lastCommit.get(SequenceNumbers.MAX_SEQ_NO)));
+            assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(lastCommit.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
+            // Translog tags should be fresh.
+            assertThat(userData.get(Translog.TRANSLOG_UUID_KEY), not(equalTo(lastCommit.get(Translog.TRANSLOG_UUID_KEY))));
+            assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1"));
+        }
+    }
 }

From bd544cfa97e5021190fb17a01dd2c4d55f14f063 Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Mon, 15 Jan 2018 18:00:02 -0500
Subject: [PATCH 5/9] Fix comment

---
 .../elasticsearch/index/engine/CombinedDeletionPolicy.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
index d2f7959a2d689..41dbb649d5ca3 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
@@ -85,8 +85,8 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
      * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
      * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
      * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
-     * the unsafe commit c2(max_seqno=2 <= gcp=2) as the starting commit for sequenced-based recovery even the commit c2
-     * contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
+     * the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the
+     * commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
      * <p>
      * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
      * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).

From 64e692a9e9041368b7545a54c69d7fcb774400bb Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Mon, 15 Jan 2018 22:00:57 -0500
Subject: [PATCH 6/9] Move OPEN_INDEX_CREATE_TRANSLOG case to onInit

---
 .../index/engine/CombinedDeletionPolicy.java       | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
index 41dbb649d5ca3..ca0d93fa7c5aa 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
@@ -60,7 +60,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
     }
 
     @Override
-    public void onInit(List<? extends IndexCommit> commits) throws IOException {
+    public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
         switch (openMode) {
             case CREATE_INDEX_AND_TRANSLOG:
                 assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
@@ -71,6 +71,11 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
                 assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
                     + "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
                 keepOnlyStartingCommitOnInit(commits);
+                // OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
+                // We therefore should not use that index commit to update the translog deletion policy.
+                if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
+                    updateTranslogDeletionPolicy();
+                }
                 break;
             default:
                 throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
@@ -99,16 +104,11 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
      * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
      * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
      */
-    private synchronized void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException {
+    private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) {
         commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete);
         assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
         lastCommit = startingCommit;
         safeCommit = startingCommit;
-        // OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
-        // We therefore should not use that index commit to update the translog deletion policy.
-        if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
-            updateTranslogDeletionPolicy();
-        }
     }
 
     @Override

From a252d0b51247ef878912729c5e435f0682e7f1fb Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Mon, 15 Jan 2018 22:15:40 -0500
Subject: [PATCH 7/9] Add assert for OPEN_INDEX_CREATE_TRANSLOG

---
 .../index/engine/InternalEngine.java            | 17 +++++++++--------
 .../index/engine/InternalEngineTests.java       | 10 ++++++----
 2 files changed, 15 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 1efbd0706d156..201ba8f08fafa 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -412,37 +412,38 @@ public void skipTranslogRecovery() {
 
     private IndexCommit getStartingCommitPoint() throws IOException {
         final IndexCommit startingIndexCommit;
-        final List<IndexCommit> existingCommits;
+        final List<IndexCommit> exisingComits;
         switch (openMode) {
             case CREATE_INDEX_AND_TRANSLOG:
                 startingIndexCommit = null;
                 break;
             case OPEN_INDEX_CREATE_TRANSLOG:
                 // Use the last commit
-                existingCommits = DirectoryReader.listCommits(store.directory());
-                startingIndexCommit = existingCommits.get(existingCommits.size() - 1);
+                exisingComits = DirectoryReader.listCommits(store.directory());
+                assert exisingComits.size() == 1 : "Expect OPEN_INDEX_CREATE_TRANSLOG has a single index commit; exisingComits [" + exisingComits + "]";
+                startingIndexCommit = exisingComits.get(exisingComits.size() - 1);
                 break;
             case OPEN_INDEX_AND_TRANSLOG:
                 // Use the safe commit
                 final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
                 final long minRetainedTranslogGen = translog.getMinFileGeneration();
-                existingCommits = DirectoryReader.listCommits(store.directory());
+                exisingComits = DirectoryReader.listCommits(store.directory());
                 // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
                 // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
-                // To avoid this issue, we only select index commits whose translog are fully retained.
+                // To avoid this issue, we only select index exisingComits whose translog are fully retained.
                 if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
                     final List<IndexCommit> recoverableCommits = new ArrayList<>();
-                    for (IndexCommit commit : existingCommits) {
+                    for (IndexCommit commit : exisingComits) {
                         if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
                             recoverableCommits.add(commit);
                         }
                     }
                     assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
-                        "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
+                        "exisingComits [" + exisingComits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
                     startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
                 } else {
                     // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
-                    startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
+                    startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(exisingComits, lastSyncedGlobalCheckpoint);
                 }
                 break;
             default:
diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index db62db7e01b46..773435335594b 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -4389,18 +4389,20 @@ public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception {
 
     public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
         IOUtils.close(engine);
-        final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
+        final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get);
         final Map<String, String> lastCommit;
         try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
             engine.skipTranslogRecovery();
             final int numDocs = between(5, 50);
             for (int i = 0; i < numDocs; i++) {
                 index(engine, i);
-                if (randomBoolean()) {
-                    engine.flush();
-                }
             }
+            globalCheckpoint.set(engine.getLocalCheckpointTracker().getMaxSeqNo());
+            engine.getTranslog().sync();
+            engine.flush();
             final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
+            assertThat("Should keep one commit with in-sync global checkpoint", commits, hasSize(1));
             lastCommit = commits.get(commits.size() - 1).getUserData();
         }
         try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {

From 97d3825e4809519503c5aef05c6e6eccb5ad2080 Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Mon, 15 Jan 2018 22:58:11 -0500
Subject: [PATCH 8/9] Revert "Add assert for OPEN_INDEX_CREATE_TRANSLOG"

This reverts commit a252d0b51247ef878912729c5e435f0682e7f1fb.
---
 .../index/engine/InternalEngine.java            | 17 ++++++++---------
 .../index/engine/InternalEngineTests.java       | 10 ++++------
 2 files changed, 12 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 201ba8f08fafa..1efbd0706d156 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -412,38 +412,37 @@ public void skipTranslogRecovery() {
 
     private IndexCommit getStartingCommitPoint() throws IOException {
         final IndexCommit startingIndexCommit;
-        final List<IndexCommit> exisingComits;
+        final List<IndexCommit> existingCommits;
         switch (openMode) {
             case CREATE_INDEX_AND_TRANSLOG:
                 startingIndexCommit = null;
                 break;
             case OPEN_INDEX_CREATE_TRANSLOG:
                 // Use the last commit
-                exisingComits = DirectoryReader.listCommits(store.directory());
-                assert exisingComits.size() == 1 : "Expect OPEN_INDEX_CREATE_TRANSLOG has a single index commit; exisingComits [" + exisingComits + "]";
-                startingIndexCommit = exisingComits.get(exisingComits.size() - 1);
+                existingCommits = DirectoryReader.listCommits(store.directory());
+                startingIndexCommit = existingCommits.get(existingCommits.size() - 1);
                 break;
             case OPEN_INDEX_AND_TRANSLOG:
                 // Use the safe commit
                 final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
                 final long minRetainedTranslogGen = translog.getMinFileGeneration();
-                exisingComits = DirectoryReader.listCommits(store.directory());
+                existingCommits = DirectoryReader.listCommits(store.directory());
                 // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
                 // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
-                // To avoid this issue, we only select index exisingComits whose translog are fully retained.
+                // To avoid this issue, we only select index commits whose translog are fully retained.
                 if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
                     final List<IndexCommit> recoverableCommits = new ArrayList<>();
-                    for (IndexCommit commit : exisingComits) {
+                    for (IndexCommit commit : existingCommits) {
                         if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
                             recoverableCommits.add(commit);
                         }
                     }
                     assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
-                        "exisingComits [" + exisingComits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
+                        "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
                     startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
                 } else {
                     // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
-                    startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(exisingComits, lastSyncedGlobalCheckpoint);
+                    startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
                 }
                 break;
             default:
diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 773435335594b..db62db7e01b46 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -4389,20 +4389,18 @@ public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception {
 
     public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
         IOUtils.close(engine);
-        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
-        final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get);
+        final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
         final Map<String, String> lastCommit;
         try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
             engine.skipTranslogRecovery();
             final int numDocs = between(5, 50);
             for (int i = 0; i < numDocs; i++) {
                 index(engine, i);
+                if (randomBoolean()) {
+                    engine.flush();
+                }
             }
-            globalCheckpoint.set(engine.getLocalCheckpointTracker().getMaxSeqNo());
-            engine.getTranslog().sync();
-            engine.flush();
             final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
-            assertThat("Should keep one commit with in-sync global checkpoint", commits, hasSize(1));
             lastCommit = commits.get(commits.size() - 1).getUserData();
         }
         try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {

From 26df28a485863b18ecd05b61327194adc12d4e8b Mon Sep 17 00:00:00 2001
From: Nhat Nguyen <nhat.nguyen@elastic.co>
Date: Mon, 15 Jan 2018 23:01:34 -0500
Subject: [PATCH 9/9] Move assert to indexShard

---
 .../org/elasticsearch/index/shard/IndexShard.java | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 3832cd0ae2055..b5d28b3a9ecce 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -23,6 +23,7 @@
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.index.CheckIndex;
+import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexOptions;
 import org.apache.lucene.index.LeafReaderContext;
@@ -1290,12 +1291,16 @@ public void createIndexAndTranslog() throws IOException {
 
     /** opens the engine on top of the existing lucene engine but creates an empty translog **/
     public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException {
-        assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
-            recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
-        SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
-        assert commitInfo.localCheckpoint >= globalCheckpoint :
-            "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+        if (Assertions.ENABLED) {
+            assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
+                recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
+            SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
+            assert commitInfo.localCheckpoint >= globalCheckpoint :
+                "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
                     + globalCheckpoint + "]";
+            final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
+            assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]";
+        }
         globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog");
         innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID);
     }