diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index dbac54e81889..8f1fa311e6c9 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -323,6 +323,7 @@ void HiveDataSource::addSplit(std::shared_ptr split) { // so we initialize it beforehand. splitReader_->configureReaderOptions(randomSkip_); splitReader_->prepareSplit(metadataFilter_, runtimeStats_); + readerOutputType_ = splitReader_->readerOutputType(); } vector_size_t HiveDataSource::applyBucketConversion( @@ -378,7 +379,12 @@ std::optional HiveDataSource::next( return nullptr; } - if (!output_) { + // Bucket conversion or delta update could add extra column to reader output. + auto needsExtraColumn = [&] { + return output_->asUnchecked()->childrenSize() < + readerOutputType_->size(); + }; + if (!output_ || needsExtraColumn()) { output_ = BaseVector::create(readerOutputType_, 0, pool_); } diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index ebbb543e0e54..572e47360b19 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -147,7 +147,11 @@ void SplitReader::configureReaderOptions( void SplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - auto rowType = createReader(); + createReader(); + if (emptySplit_) { + return; + } + auto rowType = getAdaptedRowType(); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); @@ -221,7 +225,7 @@ std::string SplitReader::toString() const { static_cast(baseRowReader_.get())); } -RowTypePtr SplitReader::createReader() { +void SplitReader::createReader() { VELOX_CHECK_NE( baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN); @@ -237,7 +241,7 @@ RowTypePtr SplitReader::createReader() { hiveConfig_->ignoreMissingFiles( connectorQueryCtx_->sessionProperties())) { emptySplit_ = true; - return nullptr; + return; } throw; } @@ -258,7 +262,9 @@ RowTypePtr SplitReader::createReader() { baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.fileFormat()) ->createReader(std::move(baseFileInput), baseReaderOpts_); +} +RowTypePtr SplitReader::getAdaptedRowType() const { auto& fileType = baseReader_->rowType(); auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema()); auto columnNames = fileType->names(); diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 5466107ca620..debf5831ef57 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -95,6 +95,10 @@ class SplitReader { void setConnectorQueryCtx(const ConnectorQueryCtx* connectorQueryCtx); + const RowTypePtr& readerOutputType() const { + return readerOutputType_; + } + std::string toString() const; protected: @@ -113,7 +117,11 @@ class SplitReader { /// Create the dwio::common::Reader object baseReader_, which will be used to /// read the data file's metadata and schema - RowTypePtr createReader(); + void createReader(); + + // Adjust the scan spec according to the current split, then return the + // adapted row type. + RowTypePtr getAdaptedRowType() const; // Check if the filters pass on the column statistics. When delta update is // present, the corresonding filter should be disabled before calling this @@ -155,7 +163,7 @@ class SplitReader { const ConnectorQueryCtx* connectorQueryCtx_; const std::shared_ptr hiveConfig_; - const RowTypePtr readerOutputType_; + RowTypePtr readerOutputType_; const std::shared_ptr ioStats_; FileHandleFactory* const fileHandleFactory_; folly::Executor* const executor_; diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index dd8c059a53ef..e96223b8cacf 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -55,7 +55,11 @@ IcebergSplitReader::IcebergSplitReader( void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - auto rowType = createReader(); + createReader(); + if (emptySplit_) { + return; + } + auto rowType = getAdaptedRowType(); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); diff --git a/velox/dwio/common/Reader.cpp b/velox/dwio/common/Reader.cpp index eb21b67b1882..7d6ee8012ae5 100644 --- a/velox/dwio/common/Reader.cpp +++ b/velox/dwio/common/Reader.cpp @@ -49,6 +49,7 @@ VectorPtr RowReader::projectColumns( } } for (auto& childSpec : spec.children()) { + VELOX_CHECK_NULL(childSpec->deltaUpdate()); VectorPtr child; if (childSpec->isConstant()) { child = BaseVector::wrapInConstant( @@ -133,7 +134,7 @@ void RowReader::readWithRowNumber( const auto& rowNumberColumnName = rowNumberColumnInfo->name; column_index_t numChildren{0}; for (auto& column : options.scanSpec()->children()) { - if (column->projectOut()) { + if (column->keepValues()) { ++numChildren; } } diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index 67744a783c7d..a040fc5316bd 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -162,7 +162,7 @@ class ScanSpec { } bool keepValues() const { - return projectOut_; + return projectOut_ || deltaUpdate_; } // Position in the RowVector returned by the top level scan. Applies diff --git a/velox/dwio/common/SelectiveStructColumnReader.cpp b/velox/dwio/common/SelectiveStructColumnReader.cpp index a07b04b14355..4401e73977bb 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.cpp +++ b/velox/dwio/common/SelectiveStructColumnReader.cpp @@ -487,7 +487,7 @@ void SelectiveStructColumnReaderBase::getValues( setComplexNulls(rows, *result); for (const auto& childSpec : scanSpec_->children()) { VELOX_TRACE_HISTORY_PUSH("getValues %s", childSpec->fieldName().c_str()); - if (!childSpec->projectOut()) { + if (!childSpec->keepValues()) { continue; }