Skip to content

Commit

Permalink
Use type of mapper instead of source when parse doc
Browse files Browse the repository at this point in the history
We introduce a typeless API in elastic#35790 where we translate the default
docType "_doc" to the user-defined docType. However, we still use
docType from the source rather the translated type (i.e, type of the
Mapper) when parsing a document. This leads to a situation where we have
two translog operations for the same document with different types:

- prvOp [Index{id='9LCpwGcBkJN7eZxaB54L', type='_doc', seqNo=1, primaryTerm=1, version=1, autoGeneratedIdTimestamp=1545125562123}]
- newOp [Index{id='9LCpwGcBkJN7eZxaB54L', type='not_doc', seqNo=1, primaryTerm=1, version=1, autoGeneratedIdTimestamp=-1}]

Closes elastic#36769
  • Loading branch information
dnhatn committed Dec 21, 2018
1 parent a9834cd commit eed3717
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,15 @@ private static boolean isEmptyDoc(Mapping mapping, XContentParser parser) throws


private static ParsedDocument parsedDocument(SourceToParse source, ParseContext.InternalParseContext context, Mapping update) {
// use the docType of the mapper rather than of the source as we may have translated _doc to the user-defined type.
final String docType = context.docMapper().type();
assert source.type().equals(context.docMapper().type()) || source.type().equals(MapperService.SINGLE_MAPPING_NAME) :
"unexpected docType; mapper type [" + docType + " source type [" + source.type() + "]";
return new ParsedDocument(
context.version(),
context.seqID(),
context.sourceToParse().id(),
context.sourceToParse().type(),
docType,
source.routing(),
context.docs(),
context.sourceToParse().source(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -862,4 +863,23 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti
client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs + moreDocs));
}

public void testShardChangesWithDefaultDocType() throws Exception {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.translog.flush_threshold_size", "512mb") // do not trim translog
.put("index.soft_deletes.enabled", true).build();
IndexService indexService = createIndex("index", settings, "user_doc", "title", "type=keyword");
client().prepareIndex("index", "_doc", "1").setSource("{}", XContentType.JSON).get();
client().prepareDelete("index", "_doc", "1").get();
client().prepareIndex("index", "user_doc", "2").setSource("{}", XContentType.JSON).get();
client().prepareDelete("index", "user_doc", "2").get();
IndexShard shard = indexService.getShard(0);
try (Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, 3, true);
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()) {
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
List<Translog.Operation> opsFromTranslog = TestTranslog.drainSnapshot(translogSnapshot, true);
assertThat(opsFromLucene, equalTo(opsFromTranslog));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -128,4 +130,16 @@ public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExce
public static long getCurrentTerm(Translog translog) {
return translog.getCurrent().getPrimaryTerm();
}

public static List<Translog.Operation> drainSnapshot(Translog.Snapshot snapshot, boolean sortBySeqNo) throws IOException {
final List<Translog.Operation> ops = new ArrayList<>(snapshot.totalOperations());
Translog.Operation op;
while ((op = snapshot.next()) != null) {
ops.add(op);
}
if (sortBySeqNo) {
ops.sort(Comparator.comparing(Translog.Operation::seqNo));
}
return ops;
}
}

0 comments on commit eed3717

Please sign in to comment.