Skip to content

Commit

Permalink
Get from translog fails with large dense_vector (elastic#104700)
Browse files Browse the repository at this point in the history
This change fixes the engine to apply the current codec when retrieving documents from the translog.
We need to use the same codec than the main index in order to ensure that all the source data is indexable.
The internal codec treats some fields differently than the default one, for instance dense_vectors are limited to 1024 dimensions.
This PR ensures that these customizations are applied when indexing document for translog retrieval.

Closes elastic#104639

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
jimczi and elasticmachine committed Jan 30, 2024
1 parent 0c44459 commit f623229
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ private GetResult getFromTranslog(
index,
mappingLookup,
documentParser,
config().getAnalyzer(),
config(),
translogInMemorySegmentsCount::incrementAndGet
);
final Engine.Searcher searcher = new Engine.Searcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.index.engine;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.BaseTermsEnum;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.ByteVectorValues;
Expand Down Expand Up @@ -83,10 +82,10 @@ final class TranslogDirectoryReader extends DirectoryReader {
Translog.Index operation,
MappingLookup mappingLookup,
DocumentParser documentParser,
Analyzer analyzer,
EngineConfig engineConfig,
Runnable onSegmentCreated
) throws IOException {
this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, analyzer, onSegmentCreated));
this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, engineConfig, onSegmentCreated));
}

private TranslogDirectoryReader(TranslogLeafReader leafReader) throws IOException {
Expand Down Expand Up @@ -205,7 +204,7 @@ private static class TranslogLeafReader extends LeafReader {
private final Translog.Index operation;
private final MappingLookup mappingLookup;
private final DocumentParser documentParser;
private final Analyzer analyzer;
private final EngineConfig engineConfig;
private final Directory directory;
private final Runnable onSegmentCreated;

Expand All @@ -217,14 +216,14 @@ private static class TranslogLeafReader extends LeafReader {
Translog.Index operation,
MappingLookup mappingLookup,
DocumentParser documentParser,
Analyzer analyzer,
EngineConfig engineConfig,
Runnable onSegmentCreated
) {
this.shardId = shardId;
this.operation = operation;
this.mappingLookup = mappingLookup;
this.documentParser = documentParser;
this.analyzer = analyzer;
this.engineConfig = engineConfig;
this.onSegmentCreated = onSegmentCreated;
this.directory = new ByteBuffersDirectory();
this.uid = Uid.encodeId(operation.id());
Expand Down Expand Up @@ -264,7 +263,10 @@ private LeafReader createInMemoryLeafReader() {

parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
parsedDocs.version().setLongValue(operation.version());
final IndexWriterConfig writeConfig = new IndexWriterConfig(analyzer).setOpenMode(IndexWriterConfig.OpenMode.CREATE);
// To guarantee indexability, we configure the analyzer and codec using the main engine configuration
final IndexWriterConfig writeConfig = new IndexWriterConfig(engineConfig.getAnalyzer()).setOpenMode(
IndexWriterConfig.OpenMode.CREATE
).setCodec(engineConfig.getCodec());
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
writer.addDocument(parsedDocs.rootDoc());
final DirectoryReader reader = open(writer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Arrays;
import java.util.function.LongSupplier;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -114,6 +115,20 @@ public void testGetFromTranslogWithSyntheticSource() throws IOException {
runGetFromTranslogWithOptions(docToIndex, sourceOptions, expectedFetchedSource, "\"long\"", 7L, true);
}

public void testGetFromTranslogWithDenseVector() throws IOException {
float[] vector = new float[2048];
for (int i = 0; i < vector.length; i++) {
vector[i] = randomFloat();
}
String docToIndex = Strings.format("""
{
"bar": %s,
"foo": "foo"
}
""", Arrays.toString(vector));
runGetFromTranslogWithOptions(docToIndex, "\"enabled\": true", docToIndex, "\"text\"", "foo", "\"dense_vector\"", false);
}

private void runGetFromTranslogWithOptions(
String docToIndex,
String sourceOptions,
Expand All @@ -122,23 +137,48 @@ private void runGetFromTranslogWithOptions(
Object expectedFooVal,
boolean sourceOnlyFetchCreatesInMemoryReader
) throws IOException {
IndexMetadata metadata = IndexMetadata.builder("test").putMapping(Strings.format("""
{
"properties": {
"foo": {
"type": %s,
"store": true
},
"bar": { "type": %s }
},
"_source": { %s }
}
}""", fieldType, fieldType, sourceOptions)).settings(indexSettings(IndexVersion.current(), 1, 1)).primaryTerm(0, 1).build();
runGetFromTranslogWithOptions(
docToIndex,
sourceOptions,
expectedResult,
fieldType,
expectedFooVal,
fieldType,
sourceOnlyFetchCreatesInMemoryReader
);
}

private void runGetFromTranslogWithOptions(
String docToIndex,
String sourceOptions,
String expectedResult,
String fieldTypeFoo,
Object expectedFooVal,
String fieldTypeBar,
boolean sourceOnlyFetchCreatesInMemoryReader
) throws IOException {
IndexMetadata metadata = IndexMetadata.builder("test")
.putMapping(Strings.format("""
{
"properties": {
"foo": {
"type": %s,
"store": true
},
"bar": { "type": %s }
},
"_source": { %s }
}
}""", fieldTypeFoo, fieldTypeBar, sourceOptions))
.settings(indexSettings(IndexVersion.current(), 1, 1))
.primaryTerm(0, 1)
.build();
IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, EngineTestCase.randomReaderWrapper());
recoverShardFromStore(primary);
LongSupplier translogInMemorySegmentCount = ((InternalEngine) primary.getEngine()).translogInMemorySegmentsCount::get;
long translogInMemorySegmentCountExpected = 0;
indexDoc(primary, "test", "0", docToIndex);
Engine.IndexResult res = indexDoc(primary, "test", "0", docToIndex);
assertTrue(res.isCreated());
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet = primary.getService().getForUpdate("0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
Expand Down

0 comments on commit f623229

Please sign in to comment.