Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCR: Optimize indexing ops using seq_no on followers #34099

Merged
merged 14 commits into from
Sep 29, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L);
} else {
if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
Expand Down Expand Up @@ -927,7 +927,7 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L);
}
} else {
versionMap.enforceSafeAccess();
Expand Down Expand Up @@ -1082,8 +1082,8 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda
Optional.of(earlyResultOnPreFlightError);
}

static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null);
public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null);
}

static IndexingStrategy skipDueToVersionConflict(
Expand All @@ -1104,7 +1104,8 @@ static IndexingStrategy overrideExistingAsIfNotThere(
return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null);
}

static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) {
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing,
long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null);
}

Expand Down Expand Up @@ -2331,6 +2332,16 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

/**
* Checks if the given operation has been processed in this engine or not.
* @return true if the given operation was processed; otherwise false.
*/
protected final boolean hasBeenProcessedBefore(Operation op) {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
return localCheckpointTracker.contains(op.seqNo());
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ Releasable acquireLock(BytesRef uid) {
return keyedLock.acquire(uid);
}

private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,11 @@ protected static BytesArray bytesArray(String string) {
return new BytesArray(string.getBytes(Charset.defaultCharset()));
}

protected static Term newUid(String id) {
public static Term newUid(String id) {
return new Term("_id", Uid.encodeId(id));
}

protected Term newUid(ParsedDocument doc) {
public static Term newUid(ParsedDocument doc) {
return newUid(doc.id());
}

Expand Down Expand Up @@ -643,7 +643,7 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), SOURCE, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
Expand Down Expand Up @@ -734,7 +734,7 @@ public static void assertOpsOnReplica(
}
}

protected void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
Thread[] thread = new Thread[randomIntBetween(3, 5)];
CountDownLatch startGun = new CountDownLatch(thread.length);
AtomicInteger offset = new AtomicInteger(-1);
Expand Down Expand Up @@ -877,7 +877,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
}
}

protected MapperService createMapperService(String type) throws IOException {
public static MapperService createMapperService(String type) throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.index.engine;

import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
Expand All @@ -18,6 +19,8 @@
*/
public final class FollowingEngine extends InternalEngine {

private final CounterMetric numOfOptimizedIndexing = new CounterMetric();

/**
* Construct a new following engine with the specified engine configuration.
*
Expand Down Expand Up @@ -51,7 +54,48 @@ private void preFlight(final Operation operation) {
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
return planIndexingAsNonPrimary(index);
/*
* A note about optimization using sequence numbers:
*
* 1. Indexing operations are processed concurrently in an engine. However, operations of the same docID are processed
* one by one under the docID lock.
*
* 2. An engine itself can resolve correctly if an operation is delivered multiple times. However, if an operation is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this note is correct? we don't execute if we see we did before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you forget to push something? this statement is not correct. There is no notion of an "optimized op" (for replicas) just an op with a seq# about the MSU. Also "However, if an operation is optimized and delivered multiple times, it will be appended into Lucene more than once." reads weird. Maybe as simple as "Operations that are optimized using the MSU optimization may not be processed twice as this will create duplicates in lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've applied your suggestion.

* optimized and delivered multiple times, it will be appended into Lucene more than once. We void this issue by
* not executing operations which have been processed before (using LocalCheckpointTracker).
*
* 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the
* leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the
* MUS on the leader when it was executed [every operation O => MSU_r(O) >= MSU_p(O)].
*
* 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU_r(O) <= LCP < seqno(O):
*
* 4.1) Given two operations O and O' with docID(O’) = docID(O) and seqno(O) < seqno(O’) then MSU_p(O') on the primary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean MSU_p(O') must be at least seqno(O’) (as O' is an update)

* must be at least seqno(O). Moreover, the MSU_r on a follower >= min(seqno(O), seqno(O')) after these operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover, the MSU_r on a follower >= min(seqno(O), seqno(O')) after these operations

I still don't follow this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I break it into two cases. I hope this is clear now.

* arrive in any order.
*
* 4.2) If such operation O' with docID(O’) = docID(O) and LCP < seqno(O’) then MSU_r(O) >= min(seqno(O), seqno(O')) > LCP
* because both arrived on the follower[4.1]. This contradicts the assumption [MSU_r(O) <= LCP].
*
* 4.3) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist
* after we apply every operation with docID = docID(O) and seqno < seqno(O). On the follower, we have applied every
* operation with seqno <= LCP, and there is no such O' with docID(O’) = docID(O) and LCP < seqno(O’)[4.2].
* These mean the follower has applied every operation with docID = docID(O) and seqno < seqno(O).
* Thus docID(O) does not exist on the follower.
*/
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
if (hasBeenProcessedBefore(index)) {
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());

} else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) {
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]";
numOfOptimizedIndexing.inc();
return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version());

} else {
return planIndexingAsNonPrimary(index);
}
}

@Override
Expand Down Expand Up @@ -85,4 +129,11 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
return true;
}

/**
* Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine.
* This metric is not persisted, and started from 0 when the engine is opened.
*/
public long getNumberOfOptimizedIndexing() {
return numOfOptimizedIndexing.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -49,6 +50,7 @@
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
Expand Down Expand Up @@ -202,7 +204,7 @@ public void testFollowIndex() throws Exception {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}

assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
unfollowIndex("index2");
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
Expand All @@ -226,6 +228,7 @@ public void testFollowIndex() throws Exception {
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs + secondBatchNumDocs);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards);
}
Expand Down Expand Up @@ -342,6 +345,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true));

assertSameDocCount("index1", "index2");
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfShards,
client().prepareSearch("index2").get().getHits().totalHits);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards);
}
Expand Down Expand Up @@ -766,6 +771,27 @@ private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index follo
});
}

private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
assertBusy(() -> {
long[] numOfOptimizedOps = new long[numberOfShards];
for (int shardId = 0; shardId < numberOfShards; shardId++) {
for (String node : internalCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
if (shard != null) {
try {
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
} catch (AlreadyClosedException e) {
throw new AssertionError(e); // causes assertBusy to retry
}
}
}
}
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
});
}

public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex(leaderIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;

import java.io.IOException;
Expand Down Expand Up @@ -72,6 +73,9 @@ public void testSimpleCcrReplication() throws Exception {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
for (IndexShard shard : followerGroup) {
assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount));
}
// Deletes should be replicated to the follower
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
for (String deleteId : deleteDocIds) {
Expand Down
Loading