Skip to content

Commit

Permalink
[CARBONDATA-633]Fixed offheap Query Crash issue This closes #533
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Jan 14, 2017
2 parents 6d29fa2 + a45ace2 commit b581465
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private MeasureChunkStoreFactory() {
* @return measure chunk store
*/
public MeasureDataChunkStore getMeasureDataChunkStore(DataType dataType, int numberOfRows) {
if (isUnsafe) {
if (!isUnsafe) {
switch (dataType) {
case DATA_BYTE:
return new SafeByteMeasureChunkStore(numberOfRows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.common.logging.impl.StandardLogService;
Expand Down Expand Up @@ -80,6 +81,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
*/
protected QueryExecutorProperties queryProperties;

/**
* query result iterator which will execute the query
* and give the result
*/
protected CarbonIterator queryIterator;

public AbstractQueryExecutor() {
queryProperties = new QueryExecutorProperties();
}
Expand Down Expand Up @@ -470,6 +477,9 @@ private int[] getComplexDimensionParentBlockIndexes(List<QueryDimension> queryDi
*/
@Override public void finish() throws QueryExecutionException {
CarbonUtil.clearBlockCache(queryProperties.dataBlocks);
if(null != queryIterator) {
queryIterator.close();
}
if (null != queryProperties.executorService) {
queryProperties.executorService.shutdown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ public class DetailQueryExecutor extends AbstractQueryExecutor<BatchResult> {
public CarbonIterator<BatchResult> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
return new DetailQueryResultIterator(
this.queryIterator = new DetailQueryResultIterator(
blockExecutionInfoList,
queryModel,
queryProperties.executorService
);
return queryIterator;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
public CarbonIterator<Object> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
return new VectorDetailQueryResultIterator(
this.queryIterator = new VectorDetailQueryResultIterator(
blockExecutionInfoList,
queryModel,
queryProperties.executorService
);
return this.queryIterator;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,12 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje

protected AbstractScannedResult scannedResult;

public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo,
FileHolder fileReader, int batchSize, QueryStatisticsModel queryStatisticsModel) {
public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
int batchSize, QueryStatisticsModel queryStatisticsModel,
BlocksChunkHolder blockChunkHolder) {
dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
blockExecutionInfo.getNumberOfBlockToScan());
blocksChunkHolder = new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
blockExecutionInfo.getTotalNumberOfMeasureBlock());
blocksChunkHolder.setFileReader(fileReader);

blocksChunkHolder = blockChunkHolder;
if (blockExecutionInfo.getFilterExecuterTree() != null) {
blockletScanner = new FilterScanner(blockExecutionInfo, queryStatisticsModel);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.carbondata.core.datastorage.store.FileHolder;
import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.scan.processor.AbstractDataBlockIterator;
import org.apache.carbondata.scan.processor.BlocksChunkHolder;
import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;

/**
Expand All @@ -37,8 +38,9 @@ public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
* @param blockExecutionInfo execution information
*/
public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
int batchSize, QueryStatisticsModel queryStatisticsModel) {
super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel);
int batchSize, QueryStatisticsModel queryStatisticsModel,
BlocksChunkHolder blockChunkHolder) {
super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, blockChunkHolder);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ public boolean hasNext() {
if (rowCounter < this.totalNumberOfRows) {
return true;
}
CarbonUtil.freeMemory(dataChunks, measureDataChunks);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import org.apache.carbondata.core.datastorage.store.FileHolder;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.scan.model.QueryModel;
import org.apache.carbondata.scan.processor.AbstractDataBlockIterator;
import org.apache.carbondata.scan.processor.BlocksChunkHolder;
import org.apache.carbondata.scan.processor.impl.DataBlockIteratorImpl;
import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;

Expand Down Expand Up @@ -87,6 +89,8 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
*/
QueryStatisticsModel queryStatisticsModel;

private BlocksChunkHolder blocksChunkHolder;

public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
ExecutorService execService) {
String batchSizeString =
Expand All @@ -101,10 +105,13 @@ public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryMo
} else {
batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
}
this.blocksChunkHolder = new BlocksChunkHolder(infos.get(0).getTotalNumberDimensionBlock(),
infos.get(0).getTotalNumberOfMeasureBlock());
this.recorder = queryModel.getStatisticsRecorder();
this.blockExecutionInfos = infos;
this.fileReader = FileFactory.getFileHolder(
FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath()));
this.blocksChunkHolder.setFileReader(fileReader);
this.execService = execService;
intialiseInfos();
initQueryStatiticsModel();
Expand Down Expand Up @@ -163,7 +170,10 @@ private DataBlockIteratorImpl getDataBlockIterator() {
BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
blockExecutionInfos.remove(executionInfo);
queryStatisticsModel.setRecorder(recorder);
return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, queryStatisticsModel);
CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
blocksChunkHolder.getMeasureDataChunk());
return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, queryStatisticsModel,
blocksChunkHolder);
}
return null;
}
Expand All @@ -182,4 +192,9 @@ public void processNextBatch(CarbonColumnarBatch columnarBatch) {
throw new UnsupportedOperationException("Please use VectorDetailQueryResultIterator");
}

@Override public void close() {
CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
blocksChunkHolder.getMeasureDataChunk());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ private void fillScannedResult(BlocksChunkHolder blocksChunkHolder)
if (bitSet.isEmpty()) {
scannedResult.setNumberOfRows(0);
scannedResult.setIndexes(new int[0]);
CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
blocksChunkHolder.getMeasureDataChunk());
return;
}
// valid scanned blocklet
Expand Down

0 comments on commit b581465

Please sign in to comment.