Skip to content


Fill LocalCheckpointTracker with Lucene commit (#34474)
Browse files Browse the repository at this point in the history
Today we rely on the LocalCheckpointTracker to ensure no duplicate when
enabling optimization using max_seq_no_of_updates. The problem is that
the LocalCheckpointTracker is not fully reloaded when opening an engine
with an out-of-order index commit. Suppose the starting commit has seq#0
and seq#2, then the current LocalCheckpointTracker would return "false"
when asking if seq#2 was processed before although seq#2 in the commit.

This change scans the existing sequence numbers in the starting commit,
then marks these as completed in the LocalCheckpointTracker to ensure
the consistent state between LocalCheckpointTracker and Lucene commit.
  • Loading branch information
dnhatn committed Oct 20, 2018
1 parent a8bfb79 commit 33fe1b3
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 11 deletions.
39 changes: 39 additions & 0 deletions server/src/main/java/org/elasticsearch/common/lucene/
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -42,6 +43,7 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand Down Expand Up @@ -78,6 +80,7 @@
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.text.ParseException;
Expand All @@ -86,6 +89,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.LongConsumer;

public class Lucene {
public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70";
Expand Down Expand Up @@ -918,4 +922,39 @@ public CacheHelper getReaderCacheHelper() {
public static NumericDocValuesField newSoftDeletesField() {
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);

* Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
* in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
* @param directoryReader the directory reader to scan
* @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive)
* @param toSeqNo the upper bound of a range of seq_no to scan (inclusive)
* @param onNewSeqNo the callback to be called whenever a new valid sequence number is found
public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo,
LongConsumer onNewSeqNo) throws IOException {
final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader);
final IndexSearcher searcher = new IndexSearcher(reader);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Weight weight = searcher.createWeight(query, false, 1.0f);
for (LeafReaderContext leaf : reader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId);
final long seqNo = seqNoDocValues.longValue();
assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo;
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class InternalEngine extends Engine {
Expand Down Expand Up @@ -193,7 +194,6 @@ public InternalEngine(EngineConfig engineConfig) {
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy =
Expand Down Expand Up @@ -227,6 +227,8 @@ public InternalEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger,
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
success = true;
Expand All @@ -242,16 +244,29 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats =
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos,
Logger logger, Supplier<Searcher> searcherSupplier, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
try {
final SequenceNumbers.CommitInfo seqNoStats =
final long maxSeqNo = seqNoStats.maxSeqNo;
final long localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
// Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will
// create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed.
// Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
// Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
// disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Searcher searcher = searcherSupplier.get()) {
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, tracker::markSeqNoAsCompleted);
return tracker;
} catch (IOException ex) {
throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex);

private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
Expand Down Expand Up @@ -678,6 +693,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" +;
// load term to tie break
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
if (op.primaryTerm() > existingTerm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5111,6 +5111,77 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception {

public void testRebuildLocalCheckpointTracker() throws Exception {
Settings.Builder settings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Path translogPath = createTempDir();
int numOps = scaledRandomIntBetween(1, 500);
List<Engine.Operation> operations = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
long seqNo = i;
final ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true));
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(),, EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
threadPool.relativeTimeInMillis(), "test-" + i));
List<List<Engine.Operation>> commits = new ArrayList<>();
commits.add(new ArrayList<>());
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
try (InternalEngine engine = createEngine(config)) {
List<Engine.Operation> flushedOperations = new ArrayList<>();
for (Engine.Operation op : operations) {
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else if (op instanceof Engine.Delete) {
engine.delete((Engine.Delete) op);
} else {
engine.noOp((Engine.NoOp) op);
if (randomInt(100) < 10) {
if (randomInt(100) < 5) {
commits.add(new ArrayList<>(flushedOperations));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
List<Engine.Operation> safeCommit = null;
for (int i = commits.size() - 1; i >= 0; i--) {
if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) {
safeCommit = commits.get(i);
assertThat(safeCommit, notNullValue());
try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog
final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker();
for (Engine.Operation op : operations) {
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op)));

static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.anyOf;
Expand Down Expand Up @@ -682,6 +684,50 @@ public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception {

public void testAddNewReplicas() throws Exception {
try (ReplicationGroup shards = createGroup(between(0, 1))) {
Thread[] threads = new Thread[between(1, 3)];
AtomicBoolean isStopped = new AtomicBoolean();
boolean appendOnly = randomBoolean();
AtomicInteger docId = new AtomicInteger();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (isStopped.get() == false) {
try {
if (appendOnly) {
String id = randomBoolean() ? Integer.toString(docId.incrementAndGet()) : null;
shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON));
} else if (frequently()) {
String id = Integer.toString(frequently() ? docId.incrementAndGet() : between(0, 10));
shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON));
} else {
String id = Integer.toString(between(0, docId.get()));
shards.delete(new DeleteRequest(index.getName(), "type", id));
if (randomInt(100) < 10) {
shards.getPrimary().flush(new FlushRequest());
} catch (Exception ex) {
throw new AssertionError(ex);
assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(50)));
IndexShard newReplica = shards.addReplica();
assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(100)));
for (Thread thread : threads) {
assertBusy(() -> assertThat(getDocIdAndSeqNos(newReplica), equalTo(getDocIdAndSeqNos(shards.getPrimary()))));

public static class BlockingTarget extends RecoveryTarget {

private final CountDownLatch recoveryBlocked;
Expand Down

0 comments on commit 33fe1b3

Please sign in to comment.