Skip to content

Commit

Permalink
fix: Support delta update on filter that is not projected out (facebo…
Browse files Browse the repository at this point in the history
…okincubator#12285)

Summary: Pull Request resolved: facebookincubator#12285

Reviewed By: pedroerp

Differential Revision: D69324241

fbshipit-source-id: 2649bcb2a71522eff949bb9569dd5008cf095fbc
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 10, 2025
1 parent b673c2d commit b59f68a
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 10 deletions.
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
// so we initialize it beforehand.
splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
readerOutputType_ = splitReader_->readerOutputType();
}

vector_size_t HiveDataSource::applyBucketConversion(
Expand Down Expand Up @@ -378,7 +379,12 @@ std::optional<RowVectorPtr> HiveDataSource::next(
return nullptr;
}

if (!output_) {
// Bucket conversion or delta update could add extra column to reader output.
auto needsExtraColumn = [&] {
return output_->asUnchecked<RowVector>()->childrenSize() <
readerOutputType_->size();
};
if (!output_ || needsExtraColumn()) {
output_ = BaseVector::create(readerOutputType_, 0, pool_);
}

Expand Down
12 changes: 9 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ void SplitReader::configureReaderOptions(
void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
auto rowType = createReader();
createReader();
if (emptySplit_) {
return;
}
auto rowType = getAdaptedRowType();

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
Expand Down Expand Up @@ -221,7 +225,7 @@ std::string SplitReader::toString() const {
static_cast<const void*>(baseRowReader_.get()));
}

RowTypePtr SplitReader::createReader() {
void SplitReader::createReader() {
VELOX_CHECK_NE(
baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN);

Expand All @@ -237,7 +241,7 @@ RowTypePtr SplitReader::createReader() {
hiveConfig_->ignoreMissingFiles(
connectorQueryCtx_->sessionProperties())) {
emptySplit_ = true;
return nullptr;
return;
}
throw;
}
Expand All @@ -258,7 +262,9 @@ RowTypePtr SplitReader::createReader() {

baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.fileFormat())
->createReader(std::move(baseFileInput), baseReaderOpts_);
}

RowTypePtr SplitReader::getAdaptedRowType() const {
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema());
auto columnNames = fileType->names();
Expand Down
12 changes: 10 additions & 2 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class SplitReader {

void setConnectorQueryCtx(const ConnectorQueryCtx* connectorQueryCtx);

const RowTypePtr& readerOutputType() const {
return readerOutputType_;
}

std::string toString() const;

protected:
Expand All @@ -113,7 +117,11 @@ class SplitReader {

/// Create the dwio::common::Reader object baseReader_, which will be used to
/// read the data file's metadata and schema
RowTypePtr createReader();
void createReader();

// Adjust the scan spec according to the current split, then return the
// adapted row type.
RowTypePtr getAdaptedRowType() const;

// Check if the filters pass on the column statistics. When delta update is
// present, the corresonding filter should be disabled before calling this
Expand Down Expand Up @@ -155,7 +163,7 @@ class SplitReader {
const ConnectorQueryCtx* connectorQueryCtx_;
const std::shared_ptr<const HiveConfig> hiveConfig_;

const RowTypePtr readerOutputType_;
RowTypePtr readerOutputType_;
const std::shared_ptr<io::IoStatistics> ioStats_;
FileHandleFactory* const fileHandleFactory_;
folly::Executor* const executor_;
Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ IcebergSplitReader::IcebergSplitReader(
void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
auto rowType = createReader();
createReader();
if (emptySplit_) {
return;
}
auto rowType = getAdaptedRowType();

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/common/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ VectorPtr RowReader::projectColumns(
}
}
for (auto& childSpec : spec.children()) {
VELOX_CHECK_NULL(childSpec->deltaUpdate());
VectorPtr child;
if (childSpec->isConstant()) {
child = BaseVector::wrapInConstant(
Expand Down Expand Up @@ -133,7 +134,7 @@ void RowReader::readWithRowNumber(
const auto& rowNumberColumnName = rowNumberColumnInfo->name;
column_index_t numChildren{0};
for (auto& column : options.scanSpec()->children()) {
if (column->projectOut()) {
if (column->keepValues()) {
++numChildren;
}
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class ScanSpec {
}

bool keepValues() const {
return projectOut_;
return projectOut_ || deltaUpdate_;
}

// Position in the RowVector returned by the top level scan. Applies
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ void SelectiveStructColumnReaderBase::getValues(
setComplexNulls(rows, *result);
for (const auto& childSpec : scanSpec_->children()) {
VELOX_TRACE_HISTORY_PUSH("getValues %s", childSpec->fieldName().c_str());
if (!childSpec->projectOut()) {
if (!childSpec->keepValues()) {
continue;
}

Expand Down

0 comments on commit b59f68a

Please sign in to comment.