From 8b8e915ed107553d9623e04985b4d9d69c006c74 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 11 Apr 2017 21:15:35 +0200 Subject: [PATCH 1/6] add failing tests --- .../index/engine/InternalEngineTests.java | 11 +------- .../ESIndexLevelReplicationTestCase.java | 24 +++++++++++----- .../IndexLevelReplicationTests.java | 28 +++++++++++++++++++ 3 files changed, 46 insertions(+), 17 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2d3ba055df4aa..d61526a8bd363 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1367,19 +1367,10 @@ protected List generateSingleDocHistory(boolean forReplica, Ve public void testOutOfOrderDocsOnReplica() throws IOException { final List ops = generateSingleDocHistory(true, - randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20); + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); assertOpsOnReplica(ops, replicaEngine, true); } - public void testNonStandardVersioningOnReplica() throws IOException { - // TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order - // is detected using seq# - final List ops = generateSingleDocHistory(true, - randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); - assertOpsOnReplica(ops, replicaEngine, false); - } - - public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException { IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder() .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 1d1af2b2fc591..c35f72d208533 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -27,13 +27,9 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardResponse; import org.elasticsearch.action.bulk.TransportShardBulkActionTests; -import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; @@ -98,6 +94,10 @@ protected ReplicationGroup createGroup(int replicas) throws IOException { } protected IndexMetaData buildIndexMetaData(int replicas) throws IOException { + return buildIndexMetaData(replicas, indexMapping); + } + + protected IndexMetaData buildIndexMetaData(int replicas, Map mappings) throws IOException { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -105,7 +105,7 @@ protected IndexMetaData buildIndexMetaData(int replicas) throws IOException { IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName()) .settings(settings) .primaryTerm(0, 1); - for (Map.Entry typeMapping : indexMapping.entrySet()) { + for (Map.Entry typeMapping : mappings.entrySet()) { metaData.putMapping(typeMapping.getKey(), typeMapping.getValue()); } return metaData.build(); @@ -224,15 +224,24 @@ public void startPrimary() throws IOException { updateAllocationIDsOnPrimary(); } - public synchronized IndexShard addReplica() throws IOException { + public IndexShard addReplica() throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); final IndexShard replica = newShard(replicaRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(replicaRouting)); + addReplica(replica); + return replica; + } + + public synchronized void addReplica(IndexShard replica) { + assert shardRoutings().stream() + .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : + "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; + replica.updatePrimaryTerm(primary.getPrimaryTerm()); replicas.add(replica); updateAllocationIDsOnPrimary(); - return replica; } + public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException { final ShardRouting shardRouting = TestShardRouting.newShardRouting( shardId, @@ -264,6 +273,7 @@ public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOEx } boolean found = replicas.remove(replica); assert found; + closeShards(primary); primary = replica; replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); updateAllocationIDsOnPrimary(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 7a11f89b73b2e..5f69370a5caeb 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -18,6 +18,9 @@ */ package org.elasticsearch.index.replication; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -37,6 +40,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -152,4 +156,28 @@ public void testCheckpointsAdvance() throws Exception { } } + public void testConflictingOpsOnReplica() throws Exception { + Map mappings = + Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { + shards.startAll(); + IndexShard replica1 = shards.getReplicas().get(0); + logger.info("--> isolated replica " + replica1.routingEntry()); + shards.removeReplica(replica1); + IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON); + shards.index(indexRequest); + shards.addReplica(replica1); + logger.info("--> promoting replica to primary " + replica1.routingEntry()); + shards.promoteReplicaToPrimary(replica1); + indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"2\"}", XContentType.JSON); + shards.index(indexRequest); + shards.refresh("test"); + for (IndexShard shard : shards) { + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new TermQuery(new Term("f", "2")), 10); + assertEquals("shard " + shard.routingEntry() + " misses new version", 1, search.totalHits); + } + } + } + } } From ee20e16662fa5fe8eca5f878f4351ce4546ed8c9 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 11 Apr 2017 22:59:55 +0200 Subject: [PATCH 2/6] Revert "rollback seq no based out of order resolving and leave it at refactoring alone" --- ... => PerThreadIDAndVersionSeqNoLookup.java} | 33 ++- .../common/lucene/uid/Versions.java | 2 +- .../lucene/uid/VersionsAndSeqNoResolver.java | 183 ++++++++++++ .../common/lucene/uid/VersionsResolver.java | 263 ------------------ .../index/engine/DeleteVersionValue.java | 6 +- .../elasticsearch/index/engine/Engine.java | 6 +- .../index/engine/InternalEngine.java | 71 ++++- .../index/engine/VersionValue.java | 22 +- .../index/get/ShardGetService.java | 2 +- .../index/mapper/ParseContext.java | 14 +- .../index/mapper/ParsedDocument.java | 4 +- .../index/mapper/SeqNoFieldMapper.java | 15 +- .../index/termvectors/TermVectorsService.java | 2 +- .../common/lucene/uid/VersionLookupTests.java | 6 +- .../common/lucene/uid/VersionsTests.java | 20 +- .../index/IndexingSlowLogTests.java | 2 +- .../index/engine/InternalEngineTests.java | 19 +- .../index/engine/LiveVersionMapTests.java | 4 +- .../index/engine/VersionValueTests.java | 4 +- .../index/shard/IndexShardIT.java | 2 +- .../index/shard/IndexShardTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 2 +- .../index/translog/TranslogTests.java | 2 +- .../recovery/RecoverySourceHandlerTests.java | 2 +- 24 files changed, 357 insertions(+), 331 deletions(-) rename core/src/main/java/org/elasticsearch/common/lucene/uid/{PerThreadIDAndVersionLookup.java => PerThreadIDAndVersionSeqNoLookup.java} (74%) create mode 100644 core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java delete mode 100644 core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionSeqNoLookup.java similarity index 74% rename from core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java rename to core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionSeqNoLookup.java index caaf7fc84af0a..4c7403b2ca5c7 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionSeqNoLookup.java @@ -29,9 +29,12 @@ import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbersService; import java.io.IOException; @@ -43,7 +46,7 @@ * in more than one document! It will only return the first one it * finds. */ -final class PerThreadIDAndVersionLookup { +final class PerThreadIDAndVersionSeqNoLookup { // TODO: do we really need to store all this stuff? some if it might not speed up anything. // we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff @@ -51,7 +54,10 @@ final class PerThreadIDAndVersionLookup { private final TermsEnum termsEnum; /** _version data */ private final NumericDocValues versions; - + /** _seq_no data */ + private final NumericDocValues seqNos; + /** _primary_term data */ + private final NumericDocValues primaryTerms; /** Reused for iteration (when the term exists) */ private PostingsEnum docsEnum; @@ -61,7 +67,7 @@ final class PerThreadIDAndVersionLookup { /** * Initialize lookup for the provided segment */ - PerThreadIDAndVersionLookup(LeafReader reader) throws IOException { + PerThreadIDAndVersionSeqNoLookup(LeafReader reader) throws IOException { Fields fields = reader.fields(); Terms terms = fields.terms(UidFieldMapper.NAME); termsEnum = terms.iterator(); @@ -74,6 +80,8 @@ final class PerThreadIDAndVersionLookup { throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field"); } + seqNos = reader.getNumericDocValues(SeqNoFieldMapper.NAME); + primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); Object readerKey = null; assert (readerKey = reader.getCoreCacheKey()) != null; this.readerKey = readerKey; @@ -113,4 +121,21 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException { return DocIdSetIterator.NO_MORE_DOCS; } } + + /** Return null if id is not found. */ + public DocIdAndSeqNo lookupSequenceNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException { + assert context.reader().getCoreCacheKey().equals(readerKey) : + "context's reader is not the same as the reader class was initialized on."; + int docID = getDocID(id, liveDocs); + if (docID != DocIdSetIterator.NO_MORE_DOCS) { + return new DocIdAndSeqNo(docID, seqNos == null ? SequenceNumbersService.UNASSIGNED_SEQ_NO : seqNos.get(docID), context); + } else { + return null; + } + } + + /** returns 0 if the primary term is not found */ + public long lookUpPrimaryTerm(int docID) throws IOException { + return primaryTerms == null ? 0 : primaryTerms.get(docID); + } } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java index b7c62a8f2443a..8cc612421551c 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java @@ -19,7 +19,7 @@ package org.elasticsearch.common.lucene.uid; -public final class Versions { +public class Versions { /** used to indicate the write operation should succeed regardless of current version **/ public static final long MATCH_ANY = -3L; diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java new file mode 100644 index 0000000000000..a402f9817385a --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -0,0 +1,183 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.lucene.uid; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReader.CoreClosedListener; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Term; +import org.apache.lucene.util.CloseableThreadLocal; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.mapper.UidFieldMapper; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND; + +/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */ +public class VersionsAndSeqNoResolver { + + static final ConcurrentMap> lookupStates = + ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + + // Evict this reader from lookupStates once it's closed: + private static final CoreClosedListener removeLookupState = key -> { + CloseableThreadLocal ctl = lookupStates.remove(key); + if (ctl != null) { + ctl.close(); + } + }; + + private static PerThreadIDAndVersionSeqNoLookup getLookupState(LeafReader reader) throws IOException { + Object key = reader.getCoreCacheKey(); + CloseableThreadLocal ctl = lookupStates.get(key); + if (ctl == null) { + // First time we are seeing this reader's core; make a + // new CTL: + ctl = new CloseableThreadLocal<>(); + CloseableThreadLocal other = lookupStates.putIfAbsent(key, ctl); + if (other == null) { + // Our CTL won, we must remove it when the + // core is closed: + reader.addCoreClosedListener(removeLookupState); + } else { + // Another thread beat us to it: just use + // their CTL: + ctl = other; + } + } + + PerThreadIDAndVersionSeqNoLookup lookupState = ctl.get(); + if (lookupState == null) { + lookupState = new PerThreadIDAndVersionSeqNoLookup(reader); + ctl.set(lookupState); + } + + return lookupState; + } + + private VersionsAndSeqNoResolver() { + } + + /** Wraps an {@link LeafReaderContext}, a doc ID relative to the context doc base and a version. */ + public static class DocIdAndVersion { + public final int docId; + public final long version; + public final LeafReaderContext context; + + public DocIdAndVersion(int docId, long version, LeafReaderContext context) { + this.docId = docId; + this.version = version; + this.context = context; + } + } + + /** Wraps an {@link LeafReaderContext}, a doc ID relative to the context doc base and a seqNo. */ + public static class DocIdAndSeqNo { + public final int docId; + public final long seqNo; + public final LeafReaderContext context; + + public DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) { + this.docId = docId; + this.seqNo = seqNo; + this.context = context; + } + } + + + /** + * Load the internal doc ID and version for the uid from the reader, returning
    + *
  • null if the uid wasn't found, + *
  • a doc ID and a version otherwise + *
+ */ + public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException { + assert term.field().equals(UidFieldMapper.NAME); + List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return null; + } + // iterate backwards to optimize for the frequently updated documents + // which are likely to be in the last segments + for (int i = leaves.size() - 1; i >= 0; i--) { + LeafReaderContext context = leaves.get(i); + LeafReader leaf = context.reader(); + PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf); + DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context); + if (result != null) { + return result; + } + } + return null; + } + + /** + * Load the internal doc ID and sequence number for the uid from the reader, returning
    + *
  • null if the uid wasn't found, + *
  • a doc ID and the associated seqNo otherwise + *
+ */ + public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException { + assert term.field().equals(UidFieldMapper.NAME); + List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return null; + } + // iterate backwards to optimize for the frequently updated documents + // which are likely to be in the last segments + for (int i = leaves.size() - 1; i >= 0; i--) { + LeafReaderContext context = leaves.get(i); + LeafReader leaf = context.reader(); + PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf); + DocIdAndSeqNo result = lookup.lookupSequenceNo(term.bytes(), leaf.getLiveDocs(), context); + if (result != null) { + return result; + } + } + return null; + } + + /** + * Load the primaryTerm associated with the given {@link DocIdAndSeqNo} + */ + public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo) throws IOException { + LeafReader leaf = docIdAndSeqNo.context.reader(); + PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf); + long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId); + assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]" + + " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]"; + return result; + } + + /** + * Load the version for the uid from the reader, returning
    + *
  • {@link Versions#NOT_FOUND} if no matching doc exists, + *
  • the version associated with the provided uid otherwise + *
+ */ + public static long loadVersion(IndexReader reader, Term term) throws IOException { + final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); + return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; + } +} diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java deleted file mode 100644 index fb5875cbae5d2..0000000000000 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.lucene.uid; - -import org.apache.lucene.index.Fields; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReader.CoreClosedListener; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.PostingsEnum; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.Terms; -import org.apache.lucene.index.TermsEnum; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.CloseableThreadLocal; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; -import org.elasticsearch.index.mapper.UidFieldMapper; -import org.elasticsearch.index.seqno.SequenceNumbersService; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - -import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND; - -/** Utility class to resolve the Lucene doc ID and version for a given uid. */ -public class VersionsResolver { - - static final ConcurrentMap> - lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - - // Evict this reader from lookupStates once it's closed: - private static final CoreClosedListener removeLookupState = key -> { - CloseableThreadLocal ctl = lookupStates.remove(key); - if (ctl != null) { - ctl.close(); - } - }; - - private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) - throws IOException { - Object key = reader.getCoreCacheKey(); - CloseableThreadLocal ctl = lookupStates.get(key); - if (ctl == null) { - // First time we are seeing this reader's core; make a - // new CTL: - ctl = new CloseableThreadLocal<>(); - CloseableThreadLocal other = - lookupStates.putIfAbsent(key, ctl); - if (other == null) { - // Our CTL won, we must remove it when the - // core is closed: - reader.addCoreClosedListener(removeLookupState); - } else { - // Another thread beat us to it: just use - // their CTL: - ctl = other; - } - } - - PerThreadIDAndVersionLookup lookupState = ctl.get(); - if (lookupState == null) { - lookupState = new PerThreadIDAndVersionLookup(reader); - ctl.set(lookupState); - } - - return lookupState; - } - - private VersionsResolver() { - } - - /** - * Wraps an {@link LeafReaderContext}, a doc ID relative to the context doc base and - * a version. - **/ - public static class DocIdAndVersion { - public final int docId; - public final long version; - public final LeafReaderContext context; - - public DocIdAndVersion(int docId, long version, LeafReaderContext context) { - this.docId = docId; - this.version = version; - this.context = context; - } - } - - /** - * Load the internal doc ID and version for the uid from the reader, returning
    - *
  • null if the uid wasn't found, - *
  • a doc ID and a version otherwise - *
- */ - public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) - throws IOException { - assert term.field().equals(UidFieldMapper.NAME); - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return null; - } - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReaderContext context = leaves.get(i); - LeafReader leaf = context.reader(); - PerThreadIDAndVersionLookup lookup = getLookupState(leaf); - DocIdAndVersion result = - lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context); - if (result != null) { - return result; - } - } - return null; - } - - /** - * Load the version for the uid from the reader, returning
    - *
  • {@link Versions#NOT_FOUND} if no matching doc exists, - *
  • the version associated with the provided uid otherwise - *
- */ - public static long loadVersion(IndexReader reader, Term term) throws IOException { - final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); - return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; - } - - /** - * Returns the sequence number for the given uid term, returning - * {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found. - */ - public static long loadSeqNo(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid"; - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReader leaf = leaves.get(i).reader(); - Bits liveDocs = leaf.getLiveDocs(); - - TermsEnum termsEnum = null; - SortedNumericDocValues dvField = null; - PostingsEnum docsEnum = null; - - final Fields fields = leaf.fields(); - if (fields != null) { - Terms terms = fields.terms(UidFieldMapper.NAME); - if (terms != null) { - termsEnum = terms.iterator(); - assert termsEnum != null; - dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME); - assert dvField != null; - - final BytesRef id = term.bytes(); - if (termsEnum.seekExact(id)) { - // there may be more than one matching docID, in the - // case of nested docs, so we want the last one: - docsEnum = termsEnum.postings(docsEnum, 0); - int docID = DocIdSetIterator.NO_MORE_DOCS; - for (int d = docsEnum.nextDoc(); - d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { - if (liveDocs != null && liveDocs.get(d) == false) { - continue; - } - docID = d; - } - - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - dvField.setDocument(docID); - assert dvField.count() == 1 : - "expected only a single value for _seq_no but got " + - dvField.count(); - return dvField.valueAt(0); - } - } - } - } - - } - return SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - - /** - * Returns the primary term for the given uid term, returning {@code 0} if none is found. - */ - public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid"; - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return 0; - } - - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReader leaf = leaves.get(i).reader(); - Bits liveDocs = leaf.getLiveDocs(); - - TermsEnum termsEnum = null; - NumericDocValues dvField = null; - PostingsEnum docsEnum = null; - - final Fields fields = leaf.fields(); - if (fields != null) { - Terms terms = fields.terms(UidFieldMapper.NAME); - if (terms != null) { - termsEnum = terms.iterator(); - assert termsEnum != null; - dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - assert dvField != null; - - final BytesRef id = term.bytes(); - if (termsEnum.seekExact(id)) { - // there may be more than one matching docID, in the - // case of nested docs, so we want the last one: - docsEnum = termsEnum.postings(docsEnum, 0); - int docID = DocIdSetIterator.NO_MORE_DOCS; - for (int d = docsEnum.nextDoc(); - d != DocIdSetIterator.NO_MORE_DOCS; - d = docsEnum.nextDoc()) { - if (liveDocs != null && liveDocs.get(d) == false) { - continue; - } - docID = d; - } - - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - return dvField.get(docID); - } - } - } - } - - } - return 0; - } -} diff --git a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java index 45b28d96ba283..9ba084a1a9f69 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java @@ -29,8 +29,8 @@ class DeleteVersionValue extends VersionValue { private final long time; - DeleteVersionValue(long version, long time) { - super(version); + DeleteVersionValue(long version,long seqNo, long term, long time) { + super(version, seqNo, term); this.time = time; } @@ -53,6 +53,8 @@ public long ramBytesUsed() { public String toString() { return "DeleteVersionValue{" + "version=" + getVersion() + + ", seqNo=" + getSeqNo() + + ", term=" + getTerm() + ",time=" + time + '}'; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 217bc4592823f..59655abf2894c 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -55,8 +55,8 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.lucene.uid.VersionsResolver; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -464,7 +464,7 @@ protected final GetResult getFromSearcher(Get get, Function se final Searcher searcher = searcherFactory.apply("get"); final DocIdAndVersion docIdAndVersion; try { - docIdAndVersion = VersionsResolver.loadDocIdAndVersion(searcher.reader(), get.uid()); + docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid()); } catch (Exception e) { Releasables.closeWhileHandlingException(searcher); //TODO: A better exception goes here diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 333dd769eaf68..90d3b054d642f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -51,7 +51,8 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.lucene.uid.VersionsResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -416,6 +417,43 @@ enum OpVsLuceneDocStatus { LUCENE_DOC_NOT_FOUND } + private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { + assert op.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "resolving ops based seq# but no seqNo is found"; + final OpVsLuceneDocStatus status; + final VersionValue versionValue = versionMap.getUnderLock(op.uid()); + assert incrementVersionLookup(); + if (versionValue != null) { + if (op.seqNo() > versionValue.getSeqNo() || + (op.seqNo() == versionValue.getSeqNo() && op.primaryTerm() > versionValue.getTerm())) + status = OpVsLuceneDocStatus.OP_NEWER; + else { + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } else { + // load from index + assert incrementIndexVersionLookup(); + try (Searcher searcher = acquireSearcher("load_seq_no")) { + DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); + if (docAndSeqNo == null) { + status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; + } else if (op.seqNo() > docAndSeqNo.seqNo) { + status = OpVsLuceneDocStatus.OP_NEWER; + } else if (op.seqNo() == docAndSeqNo.seqNo) { + // load term to tie break + final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo); + if (op.primaryTerm() > existingTerm) { + status = OpVsLuceneDocStatus.OP_NEWER; + } else { + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } else { + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } + } + return status; + } + /** resolves the current version of the document, returning null if not found */ private VersionValue resolveDocVersion(final Operation op) throws IOException { assert incrementVersionLookup(); // used for asserting in tests @@ -424,7 +462,7 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException { assert incrementIndexVersionLookup(); // used for asserting in tests final long currentVersion = loadCurrentVersionFromIndex(op.uid()); if (currentVersion != Versions.NOT_FOUND) { - versionValue = new VersionValue(currentVersion); + versionValue = new VersionValue(currentVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0L); } } else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.getTime()) > @@ -436,6 +474,7 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException { private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation op) throws IOException { + assert op.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO : "op is resolved based versions but have a seq#"; assert op.version() >= 0 : "versions should be non-negative. got " + op.version(); final VersionValue versionValue = resolveDocVersion(op); if (versionValue == null) { @@ -601,7 +640,14 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index); + final OpVsLuceneDocStatus opVsLucene; + if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + } else { + assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) : + "index is newly created but op has no sequence numbers. op: " + index; + opVsLucene = compareOpToLuceneDocBasedOnVersions(index); + } if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { @@ -671,9 +717,9 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); index(index.docs(), indexWriter); } - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(plan.versionForIndexing)); - return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, - plan.currentNotFoundOrDeleted); + versionMap.putUnderLock(index.uid().bytes(), + new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); + return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { /* There is no tragic event recorded so this must be a document failure. @@ -873,7 +919,14 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(delete); + final OpVsLuceneDocStatus opVsLucene; + if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); + } else { + assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) : + "index is newly created but op has no sequence numbers. op: " + delete; + opVsLucene = compareOpToLuceneDocBasedOnVersions(delete); + } final DeletionStrategy plan; if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { @@ -923,7 +976,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) indexWriter.deleteDocuments(delete.uid()); } versionMap.putUnderLock(delete.uid().bytes(), - new DeleteVersionValue(plan.versionOfDeletion, + new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis())); return new DeleteResult( plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); @@ -1490,7 +1543,7 @@ private Releasable acquireLock(Term uid) { private long loadCurrentVersionFromIndex(Term uid) throws IOException { assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_version")) { - return VersionsResolver.loadVersion(searcher.reader(), uid); + return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid); } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java index 53550578cc3a0..f6844ee792bf2 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -32,8 +32,15 @@ class VersionValue implements Accountable { /** the version of the document. used for versioned indexed operations and as a BWC layer, where no seq# are set yet */ private final long version; - VersionValue(long version) { + /** the seq number of the operation that last changed the associated uuid */ + private final long seqNo; + /** the the term of the operation that last changed the associated uuid */ + private final long term; + + VersionValue(long version, long seqNo, long term) { this.version = version; + this.seqNo = seqNo; + this.term = term; } public long getTime() { @@ -48,6 +55,14 @@ public boolean isDelete() { return false; } + public long getSeqNo() { + return seqNo; + } + + public long getTerm() { + return term; + } + @Override public long ramBytesUsed() { return BASE_RAM_BYTES_USED; @@ -61,6 +76,9 @@ public Collection getChildResources() { @Override public String toString() { return "VersionValue{" + - "version=" + version + "}"; + "version=" + version + + ", seqNo=" + seqNo + + ", term=" + term + + '}'; } } diff --git a/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 33f55c7a91664..6d3e1e3ab6a56 100644 --- a/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.util.set.Sets; diff --git a/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java b/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java index f1b5760e90116..64c4932e47017 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java @@ -254,12 +254,12 @@ public void version(Field version) { } @Override - public SeqNoFieldMapper.SequenceID seqID() { + public SeqNoFieldMapper.SequenceIDFields seqID() { return in.seqID(); } @Override - public void seqID(SeqNoFieldMapper.SequenceID seqID) { + public void seqID(SeqNoFieldMapper.SequenceIDFields seqID) { in.seqID(seqID); } @@ -310,7 +310,7 @@ public static class InternalParseContext extends ParseContext { private Field version; - private SeqNoFieldMapper.SequenceID seqID; + private SeqNoFieldMapper.SequenceIDFields seqID; private final AllEntries allEntries; @@ -404,12 +404,12 @@ public void version(Field version) { } @Override - public SeqNoFieldMapper.SequenceID seqID() { + public SeqNoFieldMapper.SequenceIDFields seqID() { return this.seqID; } @Override - public void seqID(SeqNoFieldMapper.SequenceID seqID) { + public void seqID(SeqNoFieldMapper.SequenceIDFields seqID) { this.seqID = seqID; } @@ -539,9 +539,9 @@ public boolean isWithinMultiFields() { public abstract void version(Field version); - public abstract SeqNoFieldMapper.SequenceID seqID(); + public abstract SeqNoFieldMapper.SequenceIDFields seqID(); - public abstract void seqID(SeqNoFieldMapper.SequenceID seqID); + public abstract void seqID(SeqNoFieldMapper.SequenceIDFields seqID); public final boolean includeInAll(Boolean includeInAll, FieldMapper mapper) { return includeInAll(includeInAll, mapper.fieldType().indexOptions() != IndexOptions.NONE); diff --git a/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java b/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java index f7d5804be0d53..91cf2aa4fa4a8 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java @@ -36,7 +36,7 @@ public class ParsedDocument { private final String id, type; private final BytesRef uid; - private final SeqNoFieldMapper.SequenceID seqID; + private final SeqNoFieldMapper.SequenceIDFields seqID; private final String routing; @@ -50,7 +50,7 @@ public class ParsedDocument { private String parent; public ParsedDocument(Field version, - SeqNoFieldMapper.SequenceID seqID, + SeqNoFieldMapper.SequenceIDFields seqID, String id, String type, String routing, diff --git a/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java index 9f844a3371e1a..9612d94e661ce 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java @@ -22,7 +22,6 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; -import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexableField; @@ -66,13 +65,13 @@ public class SeqNoFieldMapper extends MetadataFieldMapper { * A sequence ID, which is made up of a sequence number (both the searchable * and doc_value version of the field) and the primary term. */ - public static class SequenceID { + public static class SequenceIDFields { public final Field seqNo; public final Field seqNoDocValue; public final Field primaryTerm; - public SequenceID(Field seqNo, Field seqNoDocValue, Field primaryTerm) { + public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm) { Objects.requireNonNull(seqNo, "sequence number field cannot be null"); Objects.requireNonNull(seqNoDocValue, "sequence number dv field cannot be null"); Objects.requireNonNull(primaryTerm, "primary term field cannot be null"); @@ -81,9 +80,9 @@ public SequenceID(Field seqNo, Field seqNoDocValue, Field primaryTerm) { this.primaryTerm = primaryTerm; } - public static SequenceID emptySeqID() { - return new SequenceID(new LongPoint(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO), - new SortedNumericDocValuesField(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO), + public static SequenceIDFields emptySeqID() { + return new SequenceIDFields(new LongPoint(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO), + new NumericDocValuesField(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO), new NumericDocValuesField(PRIMARY_TERM_NAME, 0)); } } @@ -242,7 +241,7 @@ public void preParse(ParseContext context) throws IOException { protected void parseCreateField(ParseContext context, List fields) throws IOException { // see InternalEngine.innerIndex to see where the real version value is set // also see ParsedDocument.updateSeqID (called by innerIndex) - SequenceID seqID = SequenceID.emptySeqID(); + SequenceIDFields seqID = SequenceIDFields.emptySeqID(); context.seqID(seqID); fields.add(seqID.seqNo); fields.add(seqID.seqNoDocValue); @@ -264,7 +263,7 @@ public void postParse(ParseContext context) throws IOException { for (int i = 1; i < context.docs().size(); i++) { final Document doc = context.docs().get(i); doc.add(new LongPoint(NAME, 1)); - doc.add(new SortedNumericDocValuesField(NAME, 1L)); + doc.add(new NumericDocValuesField(NAME, 1L)); doc.add(new NumericDocValuesField(PRIMARY_TERM_NAME, 0L)); } } diff --git a/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java b/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java index 77d8204e45d0b..6351282a38a18 100644 --- a/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java +++ b/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java @@ -34,7 +34,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java index d771ced56ffe4..05922ee85ec95 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java @@ -31,7 +31,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.test.ESTestCase; @@ -53,7 +53,7 @@ public void testSimple() throws Exception { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader()); + PerThreadIDAndVersionSeqNoLookup lookup = new PerThreadIDAndVersionSeqNoLookup(segment.reader()); // found doc DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); @@ -81,7 +81,7 @@ public void testTwoDocuments() throws Exception { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader()); + PerThreadIDAndVersionSeqNoLookup lookup = new PerThreadIDAndVersionSeqNoLookup(segment.reader()); // return the last doc when there are duplicates DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index 6b9960294e41d..c5e66a3bf2ad5 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -38,8 +38,8 @@ import java.util.ArrayList; import java.util.List; -import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadDocIdAndVersion; -import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadVersion; +import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadDocIdAndVersion; +import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadVersion; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -145,7 +145,7 @@ public void testNestedDocuments() throws IOException { /** Test that version map cache works, is evicted on close, etc */ public void testCache() throws Exception { - int size = VersionsResolver.lookupStates.size(); + int size = VersionsAndSeqNoResolver.lookupStates.size(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); @@ -156,21 +156,21 @@ public void testCache() throws Exception { DirectoryReader reader = DirectoryReader.open(writer); // should increase cache size by 1 assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); // should be cache hit assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); reader.close(); writer.close(); // core should be evicted from the map - assertEquals(size, VersionsResolver.lookupStates.size()); + assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size()); dir.close(); } /** Test that version map cache behaves properly with a filtered reader */ public void testCacheFilterReader() throws Exception { - int size = VersionsResolver.lookupStates.size(); + int size = VersionsAndSeqNoResolver.lookupStates.size(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); @@ -180,17 +180,17 @@ public void testCacheFilterReader() throws Exception { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); // now wrap the reader DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5)); assertEquals(87, loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6"))); // same size map: core cache key is shared - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); reader.close(); writer.close(); // core should be evicted from the map - assertEquals(size, VersionsResolver.lookupStates.size()); + assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size()); dir.close(); } } diff --git a/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java b/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java index c815b2b55f9f3..daf196da7ce86 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java @@ -41,7 +41,7 @@ public class IndexingSlowLogTests extends ESTestCase { public void testSlowLogParsedDocumentPrinterSourceToLog() throws IOException { BytesReference source = JsonXContent.contentBuilder().startObject().field("foo", "bar").endObject().bytes(); - ParsedDocument pd = new ParsedDocument(new NumericDocValuesField("version", 1), SeqNoFieldMapper.SequenceID.emptySeqID(), "id", + ParsedDocument pd = new ParsedDocument(new NumericDocValuesField("version", 1), SeqNoFieldMapper.SequenceIDFields.emptySeqID(), "id", "test", null, null, source, XContentType.JSON, null); Index index = new Index("foo", "123"); // Turning off document logging doesn't log source[] diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d61526a8bd363..5d2755dc25b91 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -83,7 +83,8 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.lucene.uid.VersionsResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -290,7 +291,7 @@ public static ParsedDocument createParsedDoc(String id, String type, String rout private static ParsedDocument testParsedDocument(String id, String type, String routing, Document document, BytesReference source, Mapping mappingUpdate) { Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); @@ -3568,9 +3569,17 @@ public long generateSeqNo() { */ private Tuple getSequenceID(Engine engine, Engine.Get get) throws EngineException { try (Searcher searcher = engine.acquireSearcher("get")) { - long seqNum = VersionsResolver.loadSeqNo(searcher.reader(), get.uid()); - long primaryTerm = VersionsResolver.loadPrimaryTerm(searcher.reader(), get.uid()); - return new Tuple<>(seqNum, primaryTerm); + final long primaryTerm; + final long seqNo; + DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), get.uid()); + if (docIdAndSeqNo == null) { + primaryTerm = 0; + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } else { + seqNo = docIdAndSeqNo.seqNo; + primaryTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docIdAndSeqNo); + } + return new Tuple<>(seqNo, primaryTerm); } catch (Exception e) { throw new EngineException(shardId, "unable to retrieve sequence id", e); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index 9161bc413c86c..97799f8c46a62 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -33,7 +33,7 @@ public void testRamBytesUsed() throws Exception { for (int i = 0; i < 100000; ++i) { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); - VersionValue version = new VersionValue(randomLong()); + VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); map.putUnderLock(uid.toBytesRef(), version); } long actualRamBytesUsed = RamUsageTester.sizeOf(map); @@ -48,7 +48,7 @@ public void testRamBytesUsed() throws Exception { for (int i = 0; i < 100000; ++i) { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); - VersionValue version = new VersionValue(randomLong()); + VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); map.putUnderLock(uid.toBytesRef(), version); } actualRamBytesUsed = RamUsageTester.sizeOf(map); diff --git a/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java b/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java index 7af8ebc7580f9..3b953edece1b4 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java @@ -25,12 +25,12 @@ public class VersionValueTests extends ESTestCase { public void testRamBytesUsed() { - VersionValue versionValue = new VersionValue(randomLong()); + VersionValue versionValue = new VersionValue(randomLong(), randomLong(), randomLong()); assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed()); } public void testDeleteRamBytesUsed() { - DeleteVersionValue versionValue = new DeleteVersionValue(randomLong(), randomLong()); + DeleteVersionValue versionValue = new DeleteVersionValue(randomLong(), randomLong(), randomLong(), randomLong()); assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed()); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index d203832fb15ec..fec0b766d3490 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -107,7 +107,7 @@ private ParsedDocument testParsedDocument(String id, String type, String routing Mapping mappingUpdate) { Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b328e86e58ded..629a8af3e0d3c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -551,7 +551,7 @@ private ParsedDocument testParsedDocument(String id, String type, String routing ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 846ca6be2014a..b7e20cf75c83c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -332,7 +332,7 @@ private Engine.IndexResult index(String id, String testFieldValue) throws IOExce document.add(new TextField("test", testFieldValue, Field.Store.YES)); Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 6b2aa5e59215e..ae430b72c4fe1 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2048,7 +2048,7 @@ public static Translog.Location randomTranslogLocation() { public void testTranslogOpSerialization() throws Exception { BytesReference B_1 = new BytesArray(new byte[]{1}); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); assert Version.CURRENT.major <= 6 : "Using UNASSIGNED_SEQ_NO can be removed in 7.0, because 6.0+ nodes have actual sequence numbers"; long randomSeqNum = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong(); long randomPrimaryTerm = randomBoolean() ? 0 : randomNonNegativeLong(); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 743510e373c50..40a92b11e7372 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -204,7 +204,7 @@ private Engine.Index getIndex(final String id) { document.add(new TextField("test", "test", Field.Store.YES)); final Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY); - final SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); From 825b8a18d9c211d5c063c6b38be8a5eaeab6401d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 12 Apr 2017 08:43:57 +0200 Subject: [PATCH 3/6] versions can be final --- .../main/java/org/elasticsearch/common/lucene/uid/Versions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java index 8cc612421551c..b7c62a8f2443a 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java @@ -19,7 +19,7 @@ package org.elasticsearch.common.lucene.uid; -public class Versions { +public final class Versions { /** used to indicate the write operation should succeed regardless of current version **/ public static final long MATCH_ANY = -3L; From 35288771850d9a322edbef6ad1031dcc04448bcd Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 12 Apr 2017 08:45:41 +0200 Subject: [PATCH 4/6] assert text --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 90d3b054d642f..ec8ee269c14c1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -474,7 +474,7 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException { private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation op) throws IOException { - assert op.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO : "op is resolved based versions but have a seq#"; + assert op.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO : "op is resolved based on versions but have a seq#"; assert op.version() >= 0 : "versions should be non-negative. got " + op.version(); final VersionValue versionValue = resolveDocVersion(op); if (versionValue == null) { From bdd5a48862b3a101e3f43487b04b71cbce04670a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 13 Apr 2017 15:32:22 +0200 Subject: [PATCH 5/6] feedback --- .../lucene/uid/VersionsAndSeqNoResolver.java | 6 ++--- .../index/engine/DeleteVersionValue.java | 13 +++------- .../index/engine/InternalEngine.java | 25 ++++++++++--------- .../index/engine/LiveVersionMap.java | 8 +++--- .../index/engine/VersionValue.java | 22 +++------------- 5 files changed, 27 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index a402f9817385a..ccff3dd67cb73 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -35,7 +35,7 @@ import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND; /** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */ -public class VersionsAndSeqNoResolver { +public final class VersionsAndSeqNoResolver { static final ConcurrentMap> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); @@ -113,7 +113,7 @@ public DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) { * */ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME); + assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field(); List leaves = reader.leaves(); if (leaves.isEmpty()) { return null; @@ -139,7 +139,7 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) * */ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME); + assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field(); List leaves = reader.leaves(); if (leaves.isEmpty()) { return null; diff --git a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java index 9ba084a1a9f69..899c06eb19637 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java @@ -27,18 +27,13 @@ class DeleteVersionValue extends VersionValue { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(DeleteVersionValue.class); - private final long time; + final long time; DeleteVersionValue(long version,long seqNo, long term, long time) { super(version, seqNo, term); this.time = time; } - @Override - public long getTime() { - return this.time; - } - @Override public boolean isDelete() { return true; @@ -52,9 +47,9 @@ public long ramBytesUsed() { @Override public String toString() { return "DeleteVersionValue{" + - "version=" + getVersion() + - ", seqNo=" + getSeqNo() + - ", term=" + getTerm() + + "version=" + version + + ", seqNo=" + seqNo + + ", term=" + term + ",time=" + time + '}'; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ec8ee269c14c1..7d2ff08f42e13 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -390,10 +390,10 @@ public GetResult get(Get get, Function searcherFactory) throws if (versionValue.isDelete()) { return GetResult.NOT_EXISTS; } - if (get.versionType().isVersionConflictForReads(versionValue.getVersion(), get.version())) { + if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) { Uid uid = Uid.createUid(get.uid().text()); throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), - get.versionType().explainConflictForReads(versionValue.getVersion(), get.version())); + get.versionType().explainConflictForReads(versionValue.version, get.version())); } refresh("realtime_get"); } @@ -423,8 +423,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) final VersionValue versionValue = versionMap.getUnderLock(op.uid()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.getSeqNo() || - (op.seqNo() == versionValue.getSeqNo() && op.primaryTerm() > versionValue.getTerm())) + if (op.seqNo() > versionValue.seqNo || + (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) status = OpVsLuceneDocStatus.OP_NEWER; else { status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; @@ -465,8 +465,7 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException { versionValue = new VersionValue(currentVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0L); } } else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() && - (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.getTime()) > - getGcDeletesInMillis()) { + (engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) { versionValue = null; } return versionValue; @@ -480,7 +479,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation if (versionValue == null) { return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else { - return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ? + return op.versionType().isVersionConflictForWrites(versionValue.version, op.version(), versionValue.isDelete()) ? OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER; } } @@ -644,6 +643,8 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); } else { + // This can happen if the primary is still on an old node and send traffic without seq# or we recover from translog + // created by an old version. assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) : "index is newly created but op has no sequence numbers. op: " + index; opVsLucene = compareOpToLuceneDocBasedOnVersions(index); @@ -679,7 +680,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { currentVersion = Versions.NOT_FOUND; currentNotFoundOrDeleted = true; } else { - currentVersion = versionValue.getVersion(); + currentVersion = versionValue.version; currentNotFoundOrDeleted = versionValue.isDelete(); } if (index.versionType().isVersionConflictForWrites( @@ -951,7 +952,7 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException currentVersion = Versions.NOT_FOUND; currentlyDeleted = true; } else { - currentVersion = versionValue.getVersion(); + currentVersion = versionValue.version; currentlyDeleted = versionValue.isDelete(); } final DeletionStrategy plan; @@ -1288,14 +1289,14 @@ private void pruneDeletedTombstones() { // TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock... // we only need to prune the deletes map; the current/old version maps are cleared on refresh: - for (Map.Entry entry : versionMap.getAllTombstones()) { + for (Map.Entry entry : versionMap.getAllTombstones()) { BytesRef uid = entry.getKey(); try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? // Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: - VersionValue versionValue = versionMap.getTombstoneUnderLock(uid); + DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid); if (versionValue != null) { - if (timeMSec - versionValue.getTime() > getGcDeletesInMillis()) { + if (timeMSec - versionValue.time > getGcDeletesInMillis()) { versionMap.removeTombstoneUnderLock(uid); } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 7233420309c72..9ee4bd43c2129 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -55,7 +55,7 @@ private static class Maps { } // All deletes also go here, and delete "tombstones" are retained after refresh: - private final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); private volatile Maps maps = new Maps(); @@ -180,7 +180,7 @@ void putUnderLock(BytesRef uid, VersionValue version) { final VersionValue prevTombstone; if (version.isDelete()) { // Also enroll the delete into tombstones, and account for its RAM too: - prevTombstone = tombstones.put(uid, version); + prevTombstone = tombstones.put(uid, (DeleteVersionValue)version); // We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up // on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift @@ -225,12 +225,12 @@ void removeTombstoneUnderLock(BytesRef uid) { } /** Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. */ - VersionValue getTombstoneUnderLock(BytesRef uid) { + DeleteVersionValue getTombstoneUnderLock(BytesRef uid) { return tombstones.get(uid); } /** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */ - Iterable> getAllTombstones() { + Iterable> getAllTombstones() { return tombstones.entrySet(); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java index f6844ee792bf2..1c2fa3005207d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -30,12 +30,12 @@ class VersionValue implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class); /** the version of the document. used for versioned indexed operations and as a BWC layer, where no seq# are set yet */ - private final long version; + final long version; /** the seq number of the operation that last changed the associated uuid */ - private final long seqNo; + final long seqNo; /** the the term of the operation that last changed the associated uuid */ - private final long term; + final long term; VersionValue(long version, long seqNo, long term) { this.version = version; @@ -43,26 +43,10 @@ class VersionValue implements Accountable { this.term = term; } - public long getTime() { - throw new UnsupportedOperationException(); - } - - public long getVersion() { - return version; - } - public boolean isDelete() { return false; } - public long getSeqNo() { - return seqNo; - } - - public long getTerm() { - return term; - } - @Override public long ramBytesUsed() { return BASE_RAM_BYTES_USED; From a59a39a9068c6ed6c3d322dc6ec5fdcd453f5277 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 14 Apr 2017 21:20:59 +0200 Subject: [PATCH 6/6] feedback --- ... => PerThreadIDVersionAndSeqNoLookup.java} | 14 +++++--- .../lucene/uid/VersionsAndSeqNoResolver.java | 35 +++++++++---------- .../index/engine/InternalEngine.java | 2 +- .../common/lucene/uid/VersionLookupTests.java | 4 +-- 4 files changed, 28 insertions(+), 27 deletions(-) rename core/src/main/java/org/elasticsearch/common/lucene/uid/{PerThreadIDAndVersionSeqNoLookup.java => PerThreadIDVersionAndSeqNoLookup.java} (92%) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionSeqNoLookup.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java similarity index 92% rename from core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionSeqNoLookup.java rename to core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 4c7403b2ca5c7..80977618c4bc2 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionSeqNoLookup.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -46,7 +46,7 @@ * in more than one document! It will only return the first one it * finds. */ -final class PerThreadIDAndVersionSeqNoLookup { +final class PerThreadIDVersionAndSeqNoLookup { // TODO: do we really need to store all this stuff? some if it might not speed up anything. // we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff @@ -67,7 +67,7 @@ final class PerThreadIDAndVersionSeqNoLookup { /** * Initialize lookup for the provided segment */ - PerThreadIDAndVersionSeqNoLookup(LeafReader reader) throws IOException { + PerThreadIDVersionAndSeqNoLookup(LeafReader reader) throws IOException { Fields fields = reader.fields(); Terms terms = fields.terms(UidFieldMapper.NAME); termsEnum = terms.iterator(); @@ -123,7 +123,7 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException { } /** Return null if id is not found. */ - public DocIdAndSeqNo lookupSequenceNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException { + DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException { assert context.reader().getCoreCacheKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; int docID = getDocID(id, liveDocs); @@ -134,8 +134,12 @@ public DocIdAndSeqNo lookupSequenceNo(BytesRef id, Bits liveDocs, LeafReaderCont } } - /** returns 0 if the primary term is not found */ - public long lookUpPrimaryTerm(int docID) throws IOException { + /** + * returns 0 if the primary term is not found. + * + * Note that 0 is an illegal primary term. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} + **/ + long lookUpPrimaryTerm(int docID) throws IOException { return primaryTerms == null ? 0 : primaryTerms.get(docID); } } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index ccff3dd67cb73..1cbae29a3da69 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -37,39 +37,36 @@ /** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */ public final class VersionsAndSeqNoResolver { - static final ConcurrentMap> lookupStates = + static final ConcurrentMap> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); // Evict this reader from lookupStates once it's closed: private static final CoreClosedListener removeLookupState = key -> { - CloseableThreadLocal ctl = lookupStates.remove(key); + CloseableThreadLocal ctl = lookupStates.remove(key); if (ctl != null) { ctl.close(); } }; - private static PerThreadIDAndVersionSeqNoLookup getLookupState(LeafReader reader) throws IOException { + private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader) throws IOException { Object key = reader.getCoreCacheKey(); - CloseableThreadLocal ctl = lookupStates.get(key); + CloseableThreadLocal ctl = lookupStates.get(key); if (ctl == null) { - // First time we are seeing this reader's core; make a - // new CTL: + // First time we are seeing this reader's core; make a new CTL: ctl = new CloseableThreadLocal<>(); - CloseableThreadLocal other = lookupStates.putIfAbsent(key, ctl); + CloseableThreadLocal other = lookupStates.putIfAbsent(key, ctl); if (other == null) { - // Our CTL won, we must remove it when the - // core is closed: + // Our CTL won, we must remove it when the core is closed: reader.addCoreClosedListener(removeLookupState); } else { - // Another thread beat us to it: just use - // their CTL: + // Another thread beat us to it: just use their CTL: ctl = other; } } - PerThreadIDAndVersionSeqNoLookup lookupState = ctl.get(); + PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get(); if (lookupState == null) { - lookupState = new PerThreadIDAndVersionSeqNoLookup(reader); + lookupState = new PerThreadIDVersionAndSeqNoLookup(reader); ctl.set(lookupState); } @@ -85,7 +82,7 @@ public static class DocIdAndVersion { public final long version; public final LeafReaderContext context; - public DocIdAndVersion(int docId, long version, LeafReaderContext context) { + DocIdAndVersion(int docId, long version, LeafReaderContext context) { this.docId = docId; this.version = version; this.context = context; @@ -98,7 +95,7 @@ public static class DocIdAndSeqNo { public final long seqNo; public final LeafReaderContext context; - public DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) { + DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) { this.docId = docId; this.seqNo = seqNo; this.context = context; @@ -123,7 +120,7 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) for (int i = leaves.size() - 1; i >= 0; i--) { LeafReaderContext context = leaves.get(i); LeafReader leaf = context.reader(); - PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf); + PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf); DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context); if (result != null) { return result; @@ -149,8 +146,8 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr for (int i = leaves.size() - 1; i >= 0; i--) { LeafReaderContext context = leaves.get(i); LeafReader leaf = context.reader(); - PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf); - DocIdAndSeqNo result = lookup.lookupSequenceNo(term.bytes(), leaf.getLiveDocs(), context); + PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf); + DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context); if (result != null) { return result; } @@ -163,7 +160,7 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr */ public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo) throws IOException { LeafReader leaf = docIdAndSeqNo.context.reader(); - PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf); + PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf); long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId); assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]" + " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]"; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7d2ff08f42e13..b3053ba3f2df8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -418,7 +418,7 @@ enum OpVsLuceneDocStatus { } private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { - assert op.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "resolving ops based seq# but no seqNo is found"; + assert op.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; final VersionValue versionValue = versionMap.getUnderLock(op.uid()); assert incrementVersionLookup(); diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java index 05922ee85ec95..8b68e76957058 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java @@ -53,7 +53,7 @@ public void testSimple() throws Exception { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDAndVersionSeqNoLookup lookup = new PerThreadIDAndVersionSeqNoLookup(segment.reader()); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader()); // found doc DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); @@ -81,7 +81,7 @@ public void testTwoDocuments() throws Exception { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDAndVersionSeqNoLookup lookup = new PerThreadIDAndVersionSeqNoLookup(segment.reader()); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader()); // return the last doc when there are duplicates DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result);