Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCR: Optimize indexing ops using seq_no on followers #34099

Merged
merged 14 commits into from
Sep 29, 2018
26 changes: 26 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,32 @@ public interface TranslogRecoveryRunner {
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
* <p>
* A note on the optimization using max_seq_no_of_updates_or_deletes:
* For each operation O, the key invariants are:
* <ol>
* <li> I1. There is no operation on docID(O) with seqno that is > MSU(O) and < seqno(O) </li>
* <li> I2. If MSU(O) < seqno(O) then docID(O) did not exist when O was applied; more precisely, if there is any O'
* * with seqno(O') < seqno(O) and docID(O') = docID(O) then the one with the greatest seqno is a delete. </li>
* </ol>
* <p>
* When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU is >= MSU(O),
* and then compares its MSU to its local checkpoint (LCP). If LCP < MSU then there’s a gap: there may be some operations that act
* on docID(O) about which we do not yet know, so we cannot perform an add. Note this also covers the case where a future operation O'
* with seqNo(O') > seqNo(O) and docId(O') = docID(O) is processed before O. In that case MSU(O') is at least seqNo(O') and this
* means MSU >= seqNo(O') > seqNo(O) > LCP (because O wasn't processed yet).
* <p>
* However, if MSU <= LCP then there is no gap: we have processed every operation <= LCP, and no operation O' with seqno(O') > LCP
* and seqno(O') < seqno(O) also has docID(O') = docID(O), because such an operation would have seqno(O') > LCP >= MSU >= MSU(O)
* which contradicts the first invariant. Furthermore in this case we immediately know that docID(O) has been deleted
* (or never existed) without needing to check Lucene for the following reason. If there's no earlier operation on docID(O) then
* this is clear, so suppose instead that the preceding operation on docID(O) is O':
* 1. The first invariant above tells us that seqno(O') <= MSU(O) <= LCP so we have already applied O' to Lucene.
* 2. Also MSU(O) <= MSU <= LCP < seqno(O) (we discard O if seqno(O) ≤ LCP) so the second invariant applies,
* meaning that the O' was a delete.
* <p>
* Moreover, operations that are optimized using the MSU optimization will not be processed twice as this will create duplicates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a sentence - "Therefore, if MSU<= LCP < seqno(O) we know that O can safely be optimized with and added to lucene with addDocument. Moreover, operations"...

* in Lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed.
*
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L);
} else {
if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
Expand Down Expand Up @@ -927,7 +927,7 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L);
}
} else {
versionMap.enforceSafeAccess();
Expand Down Expand Up @@ -1082,8 +1082,8 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda
Optional.of(earlyResultOnPreFlightError);
}

static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null);
public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null);
}

static IndexingStrategy skipDueToVersionConflict(
Expand All @@ -1104,7 +1104,8 @@ static IndexingStrategy overrideExistingAsIfNotThere(
return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null);
}

static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) {
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing,
long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null);
}

Expand Down Expand Up @@ -2331,6 +2332,16 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

/**
* Checks if the given operation has been processed in this engine or not.
* @return true if the given operation was processed; otherwise false.
*/
protected final boolean hasBeenProcessedBefore(Operation op) {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
return localCheckpointTracker.contains(op.seqNo());
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ Releasable acquireLock(BytesRef uid) {
return keyedLock.acquire(uid);
}

private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,11 @@ protected static BytesArray bytesArray(String string) {
return new BytesArray(string.getBytes(Charset.defaultCharset()));
}

protected static Term newUid(String id) {
public static Term newUid(String id) {
return new Term("_id", Uid.encodeId(id));
}

protected Term newUid(ParsedDocument doc) {
public static Term newUid(ParsedDocument doc) {
return newUid(doc.id());
}

Expand Down Expand Up @@ -643,7 +643,7 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), SOURCE, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
Expand Down Expand Up @@ -734,7 +734,7 @@ public static void assertOpsOnReplica(
}
}

protected void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
Thread[] thread = new Thread[randomIntBetween(3, 5)];
CountDownLatch startGun = new CountDownLatch(thread.length);
AtomicInteger offset = new AtomicInteger(-1);
Expand Down Expand Up @@ -877,7 +877,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
}
}

protected MapperService createMapperService(String type) throws IOException {
public static MapperService createMapperService(String type) throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.index.engine;

import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
Expand All @@ -18,6 +19,8 @@
*/
public final class FollowingEngine extends InternalEngine {

private final CounterMetric numOfOptimizedIndexing = new CounterMetric();

/**
* Construct a new following engine with the specified engine configuration.
*
Expand Down Expand Up @@ -51,7 +54,20 @@ private void preFlight(final Operation operation) {
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
return planIndexingAsNonPrimary(index);
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
if (hasBeenProcessedBefore(index)) {
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());

} else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) {
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]";
numOfOptimizedIndexing.inc();
return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version());

} else {
return planIndexingAsNonPrimary(index);
}
}

@Override
Expand Down Expand Up @@ -85,4 +101,11 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
return true;
}

/**
* Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine.
* This metric is not persisted, and started from 0 when the engine is opened.
*/
public long getNumberOfOptimizedIndexing() {
return numOfOptimizedIndexing.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -52,6 +53,7 @@
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
Expand Down Expand Up @@ -210,7 +212,7 @@ public void testFollowIndex() throws Exception {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}

assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
unfollowIndex("index2");
client().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
Expand All @@ -234,6 +236,7 @@ public void testFollowIndex() throws Exception {
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs + secondBatchNumDocs);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards);
}
Expand Down Expand Up @@ -347,6 +350,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true));

assertSameDocCount("index1", "index2");
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfShards,
client().prepareSearch("index2").get().getHits().totalHits);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards);
}
Expand Down Expand Up @@ -436,6 +441,7 @@ public void testFollowIndexWithNestedField() throws Exception {
}
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
}

public void testUnfollowNonExistingIndex() {
Expand Down Expand Up @@ -473,7 +479,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureYellow("index1");

final int numDocs = 1024;
final int numDocs = between(10, 1024);
logger.info("Indexing [{}] docs", numDocs);
for (int i = 0; i < numDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
Expand All @@ -499,6 +505,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
}
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
}

public void testDontFollowTheWrongIndex() throws Exception {
Expand Down Expand Up @@ -871,6 +878,27 @@ private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index follo
});
}

private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
assertBusy(() -> {
long[] numOfOptimizedOps = new long[numberOfShards];
for (int shardId = 0; shardId < numberOfShards; shardId++) {
for (String node : internalCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
if (shard != null) {
try {
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
} catch (AlreadyClosedException e) {
throw new AssertionError(e); // causes assertBusy to retry
}
}
}
}
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
});
}

public static PutFollowAction.Request follow(String leaderIndex, String followerIndex) {
return new PutFollowAction.Request(resumeFollow(leaderIndex, followerIndex));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;

import java.io.IOException;
Expand Down Expand Up @@ -72,6 +73,9 @@ public void testSimpleCcrReplication() throws Exception {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
for (IndexShard shard : followerGroup) {
assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount));
}
// Deletes should be replicated to the follower
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
for (String deleteId : deleteDocIds) {
Expand Down
Loading