Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pre aggregate changes #22

Open
wants to merge 15 commits into
base: 2.11
Choose a base branch
from
Prev Previous commit
Next Next commit
fixes
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Feb 2, 2024

Verified

This commit was signed with the committer’s verified signature.
commit 92717898ce3c1e9717b77e572386de1c9978d1f6
1 change: 0 additions & 1 deletion server/build.gradle
Original file line number Diff line number Diff line change
@@ -97,7 +97,6 @@ if (!isEclipse) {
}
}


dependencies {

api project(':libs:opensearch-common')
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@
* @opensearch.internal
*/
public class Lucene {
public static final String LATEST_CODEC = "Lucene95";
public static final String LATEST_CODEC = "StarTreeCodec"; // TODO : this is a hack

public static final String SOFT_DELETES_FIELD = "__soft_deletes";

Original file line number Diff line number Diff line change
@@ -70,8 +70,11 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
assert null != indexSettings;
if (mapperService == null) {
/**
* Todo : currently we don't have a single field with which we handle aggregation ( binary field etc )
* So no better way to test the changes then to change the default codec - this should be changed.
* Todo : currently we don't have a single field to use per field codec to handle aggregation
* So no better way to test the changes then to change the default codec - This should be changed.
*
* There are issues with this as restarting the process and reloading the indices results in errors
* Lucene95Codec is read when reloading the indices ( Solved now by using StarTreeCodec as the latest codec )
*/
codecs.put(DEFAULT_CODEC, new StarTreeCodec());
codecs.put(LZ4, new StarTreeCodec());
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.codec.freshstartree.codec.StarTreeCodec;
import org.opensearch.index.codec.freshstartree.codec.StarTreeDocValuesFormat;
import org.opensearch.index.mapper.CompletionFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
@@ -54,7 +55,7 @@
*
* @opensearch.internal
*/
public class PerFieldMappingPostingFormatCodec extends Lucene95Codec {
public class PerFieldMappingPostingFormatCodec extends StarTreeCodec { // TODO : this is a hack , can't extend startreecodec
private final Logger logger;
private final MapperService mapperService;
private final DocValuesFormat dvFormat = new StarTreeDocValuesFormat();
@@ -64,8 +65,8 @@ public class PerFieldMappingPostingFormatCodec extends Lucene95Codec {
: "PerFieldMappingPostingFormatCodec must subclass the latest " + "lucene codec: " + Lucene.LATEST_CODEC;
}

public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService mapperService, Logger logger) {
super(compressionMode);
public PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode compressionMode, MapperService mapperService, Logger logger) {
super();
this.mapperService = mapperService;
this.logger = logger;
}
Original file line number Diff line number Diff line change
@@ -180,7 +180,6 @@ private Map<Long, StarTreeBuilderUtils.TreeNode> constructNonStarNodes(int start
long nodeDimensionValue = getDimensionValue(startDocId, dimensionId);
for (int i = startDocId + 1; i < endDocId; i++) {
long dimensionValue = getDimensionValue(i, dimensionId);
// System.out.println("Dim value : " + dimensionValue );
if (dimensionValue != nodeDimensionValue) {
StarTreeBuilderUtils.TreeNode child = getNewNode();
child._dimensionId = dimensionId;
@@ -227,11 +226,6 @@ public void build()
long startTime = System.currentTimeMillis();
Iterator<Record> recordIterator = sortAndAggregateSegmentRecords(numSegmentRecords);
logger.info("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime));
// System.out.println(
// "== =============Finished sorting and aggregating star-tree in ms : " +
// (System.currentTimeMillis()
// - startTime));

build(recordIterator, false);
}

@@ -443,7 +437,7 @@ private Record createAggregatedDocs(StarTreeBuilderUtils.TreeNode node)
assert aggregatedRecord != null;
for (int i = node._dimensionId + 1; i < _numDimensions; i++) {
aggregatedRecord._dimensions[i] =
STAR_IN_DOC_VALUES_INDEX; // StarTreeV2Constants.STAR_IN_FORWARD_INDEX;
STAR_IN_DOC_VALUES_INDEX;
}
node._aggregatedDocId = _numDocs;
appendToStarTree(aggregatedRecord);
@@ -469,7 +463,7 @@ private Record createAggregatedDocs(StarTreeBuilderUtils.TreeNode node)
assert aggregatedRecord != null;
for (int i = node._dimensionId + 1; i < _numDimensions; i++) {
aggregatedRecord._dimensions[i] =
STAR_IN_DOC_VALUES_INDEX; // StarTreeV2Constants.STAR_IN_FORWARD_INDEX;
STAR_IN_DOC_VALUES_INDEX;
}
node._aggregatedDocId = _numDocs;
appendToStarTree(aggregatedRecord);
@@ -591,7 +585,6 @@ public void close()
success = true;
} catch (Exception e) {
throw new RuntimeException(e);
// System.out.println(e.getMessage());
} finally {
if (success) {
IOUtils.close(indexOutput);
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.index.IndexFileNames;
@@ -44,15 +46,31 @@


/**
* Off heap implementation of star tree builder Segment records are sorted and aggregated completely
* off heap Star tree records are using mixed approach where we have a buffer of hashmap to doc ids
* and also a temp file This is done since star tree records file needs to be read and written at
* same time, sometimes latest changes are not present during read
* Off heap implementation of star tree builder
*
* Segment records are stored in single file - segment.record for sorting and aggregation ( we create a doc id array
* and swap doc ids in array during sorting based on the actual segment record contents in the file )
*
* Star tree records are stored in multiple files as the algo is:
* 1. Initially create a bunch of aggregated records based on segment records
* 2. Read the above set of records and create aggregated star records and append
* 3. Repeat until we have all combinations
*
* We cannot append the result of newly created aggregated record as lucene doesn't allow append to closed files.
* We cannot keep the 'IndexOutput' open and create a 'IndexInput' to read the content as some of the recent content
* will not be visible in the reader. So we need to 'close' the 'IndexOutput' before we create a 'IndexInput'
*
* And we cannot reopen 'IndexOutput' - so only option is to create a new file for new appends until the next read.
*
* So we keep set of files and maintain a tracker array to track the start doc id for each file.
*
*/
public class OffHeapBufferedSingleTreeBuilder extends BaseSingleTreeBuilder {
private static final Logger logger = LogManager.getLogger(OffHeapBufferedSingleTreeBuilder.class);
private static final String SEGMENT_RECORD_FILE_NAME = "segment.record";
private static final String STAR_TREE_RECORD_FILE_NAME = "star-tree.record";


private final List<Long> _starTreeRecordOffsets;

private int _numReadableStarTreeRecords;
@@ -88,16 +106,8 @@ public OffHeapBufferedSingleTreeBuilder(IndexOutput output, List<String> dimensi
// TODO : create temp output
starTreeRecordFileOutput = state.directory.createOutput(starTreeRecordFileName, state.context);
starTreeFileCount++;
// CodecUtil.writeIndexHeader(
// starTreeRecordFileOutput,
// "STARTreeCodec",
// 0,
// state.segmentInfo.getId(),
// state.segmentSuffix);
segmentRecordFileOutput = state.directory.createOutput(segmentRecordFileName, state.context);

_starTreeRecordOffsets = new ArrayList<>();
//_starTreeRecordOffsets.add(0L);
}

@Override
@@ -233,9 +243,7 @@ private Record deserializeStarTreeRecord(RandomAccessInput buffer, long offset)
void appendRecord(Record record)
throws IOException {
byte[] bytes = serializeStarTreeRecord(record);
// System.out.println("Appending record : " + record.toString());
starTreeRecordFileOutput.writeBytes(bytes, bytes.length);
//System.out.println("Appending doc : " + _numDocs + "curr bytes : " + currBytes + " offset: " + _starTreeRecordOffsets.size());
_starTreeRecordOffsets.add(currBytes);
currBytes += bytes.length;
}
@@ -244,19 +252,13 @@ void appendRecord(Record record)
Record getStarTreeRecord(int docId)
throws IOException {
ensureBufferReadable(docId);
//System.out.println("Want star record of id : " + docId);
return deserializeStarTreeRecord(starTreeRecordRandomInput, _starTreeRecordOffsets.get(docId));
}

@Override
long getDimensionValue(int docId, int dimensionId)
throws IOException {
// System.out.println("doc id : " + docId + " _numReadableStarTreeRecords : " +
// _numReadableStarTreeRecords);
//System.out.println("Want dimension value record of id : " + docId);
ensureBufferReadable(docId, false, true);
// System.out.println("want offset : " + (_starTreeRecordOffsets.get(docId) + (dimensionId *
// Integer.BYTES)));
ensureBufferReadable(docId, false);
return starTreeRecordRandomInput.readLong(
(_starTreeRecordOffsets.get(docId) + (dimensionId * Long.BYTES)));
}
@@ -266,8 +268,6 @@ Iterator<Record> sortAndAggregateSegmentRecords(int numDocs)
throws IOException {
// Write all dimensions for segment records into the buffer, and sort all records using an int
// array
// PinotDataBuffer dataBuffer;
// long bufferSize = (long) numDocs * _numDimensions * Integer.BYTES;
int recordBytesLength = 0;
Integer[] sortedDocIds = new Integer[numDocs];
for (int i = 0; i < numDocs; i++) {
@@ -295,12 +295,10 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, SEGMENT_RECORD_FILE_NAME),
state.context);
final long recordBytes = recordBytesLength;
logger.info("Segment record is of length : {}", segmentRecordFileInput.length());
segmentRandomInput = segmentRecordFileInput.randomAccessSlice(0, segmentRecordFileInput.length());

try {
// ArrayUtil.introSort(sortedDocIds, comparator);
// Arrays.sort(sortedDocIds, comparator);

QuickSorter.quickSort(0, numDocs, (i1, i2) -> {
long offset1 = (long) sortedDocIds[i1] * recordBytes;
long offset2 = (long) sortedDocIds[i2] * recordBytes;
@@ -321,8 +319,6 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
sortedDocIds[i1] = sortedDocIds[i2];
sortedDocIds[i2] = temp;
});

// System.out.println("Sorted doc ids : " + Arrays.toString(sortedDocIds));
} finally {
// segmentRecordFileInput.close();
// state.directory.deleteFile(IndexFileNames.segmentFileName(state.segmentInfo.name,
@@ -333,9 +329,9 @@ private Iterator<Record> sortRecords(Integer[] sortedDocIds, int numDocs, int re
// SEGMENT_RECORD_FILE_NAME)));
}
if(sortedDocIds != null)
System.out.println("Sorted doc ids length" + sortedDocIds.length);
logger.info("Sorted doc ids length" + sortedDocIds.length);
else
System.out.println("Sorted doc ids array is null");
logger.info("Sorted doc ids array is null");

// Create an iterator for aggregated records
return new Iterator<Record>() {
@@ -380,7 +376,6 @@ public Record getSegmentRecord(int docID, long recordBytes)
@Override
Iterator<Record> generateRecordsForStarNode(int startDocId, int endDocId, int dimensionId)
throws IOException {
//System.out.println("End doc id " + endDocId);
ensureBufferReadable(endDocId, true);

// Sort all records using an int array
@@ -460,53 +455,37 @@ private void ensureBufferReadable(int docId) throws IOException {
ensureBufferReadable(docId, false);
}

private void ensureBufferReadable(int docId, boolean endDoc) throws IOException {
ensureBufferReadable(docId, endDoc, false);
}

private void ensureBufferReadable(int docId, boolean endDocCheck, boolean dimIdCheck)
private void ensureBufferReadable(int docId, boolean endDocCheck)
throws IOException {

if (docId >= prevStartDocId && (( endDocCheck && docId <= _numReadableStarTreeRecords )
|| (!endDocCheck && docId < _numReadableStarTreeRecords)) ) {
return;
}
//System.out.println("want doc : " + docId + " dim : " + dimIdCheck);
IndexInput in = null;
if(docId < _numDocs ) {
try {
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId < entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
prevStartDocId = entry.getValue();
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId < entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("First loop Current start : " + prevStartDocId + " - Current end : " + _numReadableStarTreeRecords);
this.prevStartDocId = prevStartDocId;
} finally {
// if (in != null) {
// in.close();
// }
prevStartDocId = entry.getValue();
}
this.prevStartDocId = prevStartDocId;
}

if(in != null) return;



//System.out.println("want doc 1 : " + docId + " num docs : " + _numDocs);
starTreeRecordFileOutput.close();
logger.info("Created a file : {} of size : {}" , segmentRecordFileOutput.getName(), segmentRecordFileOutput.getFilePointer());
fileToByteSizeMap.put(starTreeRecordFileOutput.getName(),
_numDocs);

//System.out.println("Star tree record file size : " + starTreeRecordFileOutput.getFilePointer());
//System.out.println("Star tree record file name : " + starTreeRecordFileOutput.getName());

starTreeRecordFileOutput.close();

String starTreeRecordFileName =
IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, STAR_TREE_RECORD_FILE_NAME) +
@@ -517,31 +496,22 @@ private void ensureBufferReadable(int docId, boolean endDocCheck, boolean dimIdC
starTreeFileCount++;

currBytes = 0;
// state.directory.sync(Collections.singleton(starTreeRecordFileOutput.getName()));
if (starTreeRecordRandomInput != null) {
starTreeRecordRandomInput = null;
}

try {
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId <= entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("Setting start value : " + entry.getValue());
prevStartDocId = entry.getValue();
int prevStartDocId = 0;
for(Map.Entry<String, Integer> entry : fileToByteSizeMap.entrySet()) {
if(docId <= entry.getValue() - 1) {
in = state.directory.openInput(entry.getKey(), state.context);
starTreeRecordRandomInput =
in.randomAccessSlice(in.getFilePointer(), in.length() - in.getFilePointer());
_numReadableStarTreeRecords = entry.getValue();
break;
}
//System.out.println("Current start : " + prevStartDocId + " - Current end : " + _numReadableStarTreeRecords);
this.prevStartDocId = prevStartDocId;
} finally {
// if (in != null) {
// in.close();
// }
prevStartDocId = entry.getValue();
}
this.prevStartDocId = prevStartDocId;

}

@@ -551,21 +521,18 @@ public void close()
boolean success = false;
try {
if (starTreeRecordFileOutput != null) {
starTreeRecordFileOutput.close();
IOUtils.deleteFilesIgnoringExceptions(state.directory, starTreeRecordFileOutput.getName());
}
success = true;
} catch (Exception e) {
throw new RuntimeException(e);
// System.out.println(e.getMessage());
} finally {
if (success) {
IOUtils.close(starTreeRecordFileOutput);
} else {
IOUtils.closeWhileHandlingException(starTreeRecordFileOutput);
}
// starTreeRecordFileOutput = null;
IOUtils.closeWhileHandlingException(starTreeRecordFileOutput);
}
// Delete all temporary segment record files
IOUtils.deleteFilesIgnoringExceptions(state.directory, segmentRecordFileOutput.getName());
// Delete all temporary star tree record files
IOUtils.deleteFilesIgnoringExceptions(state.directory, fileToByteSizeMap.keySet());
super.close();
}
Loading