Skip to content

Commit

Permalink
Enable indexing optimization using sequence numbers on replicas (#43616)
Browse files Browse the repository at this point in the history
This PR enables the indexing optimization using sequence numbers on
replicas. With this optimization, indexing on replicas should be faster
and use less memory as it can forgo the version lookup when possible.
This change also deactivates the append-only optimization on replicas.

Relates #34099
  • Loading branch information
dnhatn committed Jul 6, 2019
1 parent d3ddedf commit 9089820
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public class InternalEngine extends Engine {
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
// An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
// The value of this marker never goes backwards, and is tracked/updated differently on primary and replica.
Expand Down Expand Up @@ -413,17 +412,11 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {

private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) {
assert maxUnsafeAutoIdTimestamp.get() == -1 :
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true);
}
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
"max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]";
maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue()));
}
}
}

Expand Down Expand Up @@ -950,46 +943,35 @@ public IndexResult index(Index index) throws IOException {

protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
assert assertNonPrimaryOrigin(index);
// needs to maintain the auto_id timestamp in case this replica becomes primary
if (canOptimizeAddDocument(index)) {
mayHaveBeenIndexedBefore(index);
}
final IndexingStrategy plan;
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
/*
* As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue
* a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before
* the original append-only. In this case we can't simply proceed with the append only without consulting the version map.
* If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen
* the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(1L);
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
if (hasBeenProcessedBefore(index)) {
// the operation seq# was processed and thus the same operation was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
plan = IndexingStrategy.processButSkipLucene(false, index.version());
} else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) {
// see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes;
plan = IndexingStrategy.optimizedAppendOnly(index.version());
} else {
if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
}
versionMap.enforceSafeAccess();
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
if (index.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()){
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
plan = IndexingStrategy.processButSkipLucene(false, index.version());
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version());
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version());
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
index.version());
}
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version());
}
}
return plan;
Expand Down Expand Up @@ -1119,11 +1101,6 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
return mayHaveBeenIndexBefore;
}

// for testing
long getMaxSeqNoOfNonAppendOnlyOperations() {
return maxSeqNoOfNonAppendOnlyOperations.get();
}

private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
Expand Down Expand Up @@ -1172,7 +1149,7 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda
Optional.of(earlyResultOnPreFlightError);
}

public static IndexingStrategy optimizedAppendOnly(long versionForIndexing) {
static IndexingStrategy optimizedAppendOnly(long versionForIndexing) {
return new IndexingStrategy(true, false, true, false, versionForIndexing, null);
}

Expand Down Expand Up @@ -1317,15 +1294,9 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr

protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assert assertNonPrimaryOrigin(delete);
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final DeletionStrategy plan;
if (delete.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()) {
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
if (hasBeenProcessedBefore(delete)) {
// the operation seq# was processed thus this operation was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
Expand Down Expand Up @@ -1502,7 +1473,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
noOpResult = new NoOpResult(getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get());
} else {
markSeqNoAsSeen(noOp.seqNo());
if (softDeleteEnabled) {
if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
Expand Down
Loading

0 comments on commit 9089820

Please sign in to comment.