Skip to content

Commit

Permalink
Use standard semantics for retried auto-id requests (elastic#47311)
Browse files Browse the repository at this point in the history
Adds support for handling auto-id requests with optype CREATE. Also simplifies the code
handling this by using the standard indexing path when dealing with possible retry conflicts.

Relates elastic#47169
  • Loading branch information
ywelsch committed Oct 2, 2019
1 parent 7086d1b commit b2b14a3
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1005,13 +1005,9 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere();
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(1L);
}
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) {
plan = IndexingStrategy.optimizedAppendOnly(1L);
} else {
versionMap.enforceSafeAccess();
// resolves incoming version
Expand Down Expand Up @@ -1044,7 +1040,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
index.versionType().updateVersion(currentVersion, index.version())
canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version())
);
}
}
Expand Down Expand Up @@ -1196,11 +1192,6 @@ static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted,
true, false, versionForIndexing, null);
}

static IndexingStrategy overrideExistingAsIfNotThere() {
return new IndexingStrategy(true, true, true,
false, 1L, null);
}

public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false,
false, versionForIndexing, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3421,49 +3421,71 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
public void testDoubleDeliveryPrimary() throws IOException {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = appendOnlyPrimary(doc, false, 1);
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
final boolean create = randomBoolean();
Engine.Index operation = appendOnlyPrimary(doc, false, 1, create);
Engine.Index retry = appendOnlyPrimary(doc, true, 1, create);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(indexResult.getTranslogLocation());
Engine.IndexResult retryResult = engine.index(retry);
assertLuceneOperations(engine, 1, 1, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
assertLuceneOperations(engine, 1, create ? 0 : 1, 0);
assertEquals(1, engine.getNumVersionLookups());
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertLuceneOperations(engine, 0, 1, 0);
assertEquals(0, engine.getNumVersionLookups());
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
Engine.IndexResult indexResult = engine.index(operation);
assertLuceneOperations(engine, 0, 2, 0);
assertEquals(0, engine.getNumVersionLookups());
assertLuceneOperations(engine, 1, create ? 0 : 1, 0);
assertEquals(2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
}

engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits.value);
}
operation = appendOnlyPrimary(doc, false, 1);
retry = appendOnlyPrimary(doc, true, 1);
operation = appendOnlyPrimary(doc, false, 1, create);
retry = appendOnlyPrimary(doc, true, 1, create);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
}

engine.refresh("test");
Expand Down Expand Up @@ -3520,60 +3542,53 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException
public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
final Supplier<ParsedDocument> doc = () -> testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = appendOnlyReplica(doc.get(), false, 1, randomIntBetween(0, 5));
Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5));
// operations with a seq# equal or lower to the local checkpoint are not indexed to lucene
// and the version lookup is skipped
final boolean sameSeqNo = operation.seqNo() == retry.seqNo();
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(indexResult.getTranslogLocation());
Engine.IndexResult retryResult = engine.index(retry);
if (retry.seqNo() > operation.seqNo()) {
assertLuceneOperations(engine, 1, 1, 0);
} else {
assertLuceneOperations(engine, 1, 0, 0);
}
assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
Engine.IndexResult indexResult = engine.index(operation);
if (operation.seqNo() > retry.seqNo()) {
assertLuceneOperations(engine, 1, 1, 0);
} else {
assertLuceneOperations(engine, 1, 0, 0);
}
assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
}
boolean replicaOperationIsRetry = randomBoolean();
Engine.Index operation = appendOnlyReplica(doc.get(), replicaOperationIsRetry, 1, randomIntBetween(0, 5));

Engine.IndexResult result = engine.index(operation);
assertLuceneOperations(engine, 1, 0, 0);
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(result.getTranslogLocation());

// promote to primary: first do refresh
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits.value);
}
operation = randomAppendOnly(doc.get(), false, 1);
retry = randomAppendOnly(doc.get(), true, 1);

final boolean create = randomBoolean();
operation = appendOnlyPrimary(doc.get(), false, 1, create);
Engine.Index retry = appendOnlyPrimary(doc.get(), true, 1, create);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
// if the replica operation wasn't a retry, the operation arriving on the newly promoted primary must be a retry
if (replicaOperationIsRetry) {
Engine.IndexResult indexResult = engine.index(operation);
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
}
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
if (create) {
assertNull(retryResult.getTranslogLocation());
} else {
assertNotNull(retryResult.getTranslogLocation());
}
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
if (create) {
assertNull(indexResult.getTranslogLocation());
} else {
assertNotNull(indexResult.getTranslogLocation());
}
}

engine.refresh("test");
Expand Down Expand Up @@ -3651,10 +3666,11 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep
assertThat(indexResult.getVersion(), equalTo(1L));

isRetry = true;
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL,
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
assertNotEquals(indexResult.getSeqNo(), UNASSIGNED_SEQ_NO);
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
Expand Down Expand Up @@ -3695,7 +3711,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs()
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO,
0);
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
assertTrue(indexResult.isCreated());
assertFalse(indexResult.isCreated());
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
Expand All @@ -3720,12 +3736,16 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo
}
}

public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) {
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) {
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry,
UNASSIGNED_SEQ_NO, 0);
}

public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) {
return appendOnlyPrimary(doc, retry, autoGeneratedIdTimestamp, randomBoolean());
}

public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) {
return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, null,
Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0);
Expand All @@ -3736,14 +3756,15 @@ public void testRetryConcurrently() throws InterruptedException, IOException {
int numDocs = randomIntBetween(1000, 10000);
List<Engine.Index> docs = new ArrayList<>();
final boolean primary = randomBoolean();
final boolean create = randomBoolean();
for (int i = 0; i < numDocs; i++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(i), null,
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
final Engine.Index originalIndex;
final Engine.Index retryIndex;
if (primary) {
originalIndex = appendOnlyPrimary(doc, false, i);
retryIndex = appendOnlyPrimary(doc, true, i);
originalIndex = appendOnlyPrimary(doc, false, i, create);
retryIndex = appendOnlyPrimary(doc, true, i, create);
} else {
originalIndex = appendOnlyReplica(doc, false, i, i * 2);
retryIndex = appendOnlyReplica(doc, true, i, i * 2);
Expand Down Expand Up @@ -3776,25 +3797,12 @@ public void testRetryConcurrently() throws InterruptedException, IOException {
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
if (primary) {
assertEquals(0, engine.getNumVersionLookups());
assertEquals(0, engine.getNumIndexVersionsLookups());
} else {
// we don't really know what order the operations will arrive and thus can't predict how many
// version lookups will be needed
assertThat(engine.getNumIndexVersionsLookups(), lessThanOrEqualTo(engine.getNumVersionLookups()));
}
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
int count = searcher.count(new MatchAllDocsQuery());
assertEquals(numDocs, count);
}
if (primary) {
// primaries rely on lucene dedup and may index the same document twice
assertThat(engine.getNumDocUpdates(), greaterThanOrEqualTo((long) numDocs));
assertThat(engine.getNumDocAppends() + engine.getNumDocUpdates(), equalTo(numDocs * 2L));
} else {
// replicas rely on seq# based dedup and in this setup (same seq#) should never rely on lucene
if (create || primary == false) {
assertLuceneOperations(engine, numDocs, 0, 0);
}
}
Expand Down Expand Up @@ -3826,7 +3834,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException {
assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(appendOnlyPrimary(doc, true, timestamp2));
engine.index(appendOnlyPrimary(doc, true, timestamp2, false));
assertEquals(maxTimestamp12, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
globalCheckpoint.set(1); // make sure flush cleans up commits for later.
engine.flush();
Expand Down Expand Up @@ -4762,7 +4770,7 @@ public void testSeqNoGenerator() throws IOException {
parsedDocument,
UNASSIGNED_SEQ_NO,
randomIntBetween(1, 8),
Versions.MATCH_ANY,
Versions.NOT_FOUND,
VersionType.INTERNAL,
Engine.Operation.Origin.PRIMARY,
System.nanoTime(),
Expand Down

0 comments on commit b2b14a3

Please sign in to comment.