Skip to content

Commit

Permalink
SplitReader refactor
Browse files Browse the repository at this point in the history
To prepare for the upcoming equality delete file read, we need to
refactor the SplitReader a bit.
  • Loading branch information
yingsu00 committed Mar 7, 2024
1 parent 6f189b0 commit 986afeb
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 107 deletions.
227 changes: 123 additions & 104 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,105 @@ void SplitReader::configureReaderOptions(

void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
dwio::common::RuntimeStatistics& runtimeStats) {
createReader();

emptySplit_ = false;
if (testEmptySplit(runtimeStats)) {
emptySplit_ = true;
return;
}

createRowReader(metadataFilter);
}

uint64_t SplitReader::next(uint64_t size, VectorPtr& output) {
if (!baseReaderOpts_.randomSkip()) {
return baseRowReader_->next(size, output);
}
dwio::common::Mutation mutation;
mutation.randomSkip = baseReaderOpts_.randomSkip().get();
return baseRowReader_->next(size, output, &mutation);
}

void SplitReader::resetFilterCaches() {
if (baseRowReader_) {
baseRowReader_->resetFilterCaches();
}
}

bool SplitReader::emptySplit() const {
return emptySplit_;
}

void SplitReader::resetSplit() {
hiveSplit_.reset();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
}

auto size = baseRowReader_->estimatedRowSize();
if (size.has_value()) {
return size.value();
}
return DataSource::kUnknownRowSize;
}

void SplitReader::updateRuntimeStats(
dwio::common::RuntimeStatistics& stats) const {
if (baseRowReader_) {
baseRowReader_->updateRuntimeStats(stats);
}
}

bool SplitReader::allPrefetchIssued() const {
return baseRowReader_ && baseRowReader_->allPrefetchIssued();
}

void SplitReader::setPartitionValue(
common::ScanSpec* spec,
const std::string& partitionKey,
const std::optional<std::string>& value) const {
auto it = partitionKeys_->find(partitionKey);
VELOX_CHECK(
it != partitionKeys_->end(),
"ColumnHandle is missing for partition key {}",
partitionKey);
auto type = it->second->dataType();
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
type->kind(),
type,
value,
1,
connectorQueryCtx_->memoryPool());
spec->setConstantValue(constant);
}

std::string SplitReader::toString() const {
std::string partitionKeys;
std::for_each(
partitionKeys_->begin(),
partitionKeys_->end(),
[&](std::pair<
const std::string,
std::shared_ptr<facebook::velox::connector::hive::HiveColumnHandle>>
column) { partitionKeys += " " + column.second->toString(); });
return fmt::format(
"SplitReader: hiveSplit_{} scanSpec_{} readerOutputType_{} partitionKeys_{} reader{} rowReader{}",
hiveSplit_->toString(),
scanSpec_->toString(),
readerOutputType_->toString(),
partitionKeys,
static_cast<const void*>(baseReader_.get()),
static_cast<const void*>(baseRowReader_.get()));
}

void SplitReader::createReader() {
VELOX_CHECK_NE(
baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN);

Expand All @@ -157,6 +255,7 @@ void SplitReader::prepareSplit(
throw;
}
}

// Here we keep adding new entries to CacheTTLController when new fileHandles
// are generated, if CacheTTLController was created. Creator of
// CacheTTLController needs to make sure a size control strategy was available
Expand All @@ -169,12 +268,31 @@ void SplitReader::prepareSplit(

baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat())
->createReader(std::move(baseFileInput), baseReaderOpts_);
}

void SplitReader::createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter) {
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema());

configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
metadataFilter,
ROW(std::vector<std::string>(fileType->names()), std::move(columnTypes)),
hiveSplit_);
// NOTE: we firstly reset the finished 'baseRowReader_' of previous split
// before setting up for the next one to avoid doubling the peak memory usage.
baseRowReader_.reset();
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
}

bool SplitReader::testEmptySplit(
dwio::common::RuntimeStatistics& runtimeStats) {
// Note that this doesn't apply to Hudi tables.
emptySplit_ = false;
if (baseReader_->numberOfRows() == 0) {
emptySplit_ = true;
return;
return true;
}

// Check filters and see if the whole split can be skipped. Note that this
Expand All @@ -185,26 +303,12 @@ void SplitReader::prepareSplit(
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
partitionKeys_)) {
emptySplit_ = true;
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
return;
return true;
}

auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema());

configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
metadataFilter,
ROW(std::vector<std::string>(fileType->names()), std::move(columnTypes)),
hiveSplit_);
// NOTE: we firstly reset the finished 'baseRowReader_' of previous split
// before setting up for the next one to avoid doubling the peak memory usage.
baseRowReader_.reset();
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
return false;
}

std::vector<TypePtr> SplitReader::adaptColumns(
Expand Down Expand Up @@ -280,91 +384,6 @@ std::vector<TypePtr> SplitReader::adaptColumns(
return columnTypes;
}

uint64_t SplitReader::next(int64_t size, VectorPtr& output) {
if (!baseReaderOpts_.randomSkip()) {
return baseRowReader_->next(size, output);
}
dwio::common::Mutation mutation;
mutation.randomSkip = baseReaderOpts_.randomSkip().get();
return baseRowReader_->next(size, output, &mutation);
}

void SplitReader::resetFilterCaches() {
if (baseRowReader_) {
baseRowReader_->resetFilterCaches();
}
}

bool SplitReader::emptySplit() const {
return emptySplit_;
}

void SplitReader::resetSplit() {
hiveSplit_.reset();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
}

auto size = baseRowReader_->estimatedRowSize();
if (size.has_value()) {
return size.value();
}
return DataSource::kUnknownRowSize;
}

void SplitReader::updateRuntimeStats(
dwio::common::RuntimeStatistics& stats) const {
if (baseRowReader_) {
baseRowReader_->updateRuntimeStats(stats);
}
}

bool SplitReader::allPrefetchIssued() const {
return baseRowReader_ && baseRowReader_->allPrefetchIssued();
}

void SplitReader::setPartitionValue(
common::ScanSpec* spec,
const std::string& partitionKey,
const std::optional<std::string>& value) const {
auto it = partitionKeys_->find(partitionKey);
VELOX_CHECK(
it != partitionKeys_->end(),
"ColumnHandle is missing for partition key {}",
partitionKey);
auto type = it->second->dataType();
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
type->kind(),
type,
value,
1,
connectorQueryCtx_->memoryPool());
spec->setConstantValue(constant);
}

std::string SplitReader::toString() const {
std::string partitionKeys;
std::for_each(
partitionKeys_->begin(),
partitionKeys_->end(),
[&](std::pair<
const std::string,
std::shared_ptr<facebook::velox::connector::hive::HiveColumnHandle>>
column) { partitionKeys += " " + column.second->toString(); });
return fmt::format(
"SplitReader: hiveSplit_{} scanSpec_{} readerOutputType_{} partitionKeys_{} reader{} rowReader{}",
hiveSplit_->toString(),
scanSpec_->toString(),
readerOutputType_->toString(),
partitionKeys,
static_cast<const void*>(baseReader_.get()),
static_cast<const void*>(baseRowReader_.get()));
}

} // namespace facebook::velox::connector::hive

template <>
Expand Down
11 changes: 8 additions & 3 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ class SplitReader {
/// files or log files, and add column adapatations for metadata columns
virtual void prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
dwio::common::RuntimeStatistics& runtimeStats);

virtual uint64_t next(int64_t size, VectorPtr& output);
virtual uint64_t next(uint64_t size, VectorPtr& output);

void resetFilterCaches();

Expand All @@ -110,6 +111,12 @@ class SplitReader {
std::string toString() const;

protected:
void createReader();

bool testEmptySplit(dwio::common::RuntimeStatistics& runtimeStats);

void createRowReader(std::shared_ptr<common::MetadataFilter> metadataFilter);

// Different table formats may have different meatadata columns. This function
// will be used to update the scanSpec for these columns.
virtual std::vector<TypePtr> adaptColumns(
Expand Down Expand Up @@ -137,8 +144,6 @@ class SplitReader {
std::shared_ptr<io::IoStatistics> ioStats_;
dwio::common::ReaderOptions baseReaderOpts_;
dwio::common::RowReaderOptions baseRowReaderOpts_;

private:
bool emptySplit_;
};

Expand Down

0 comments on commit 986afeb

Please sign in to comment.