diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 2525064cee85..88bb3a5a3afb 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -634,12 +634,18 @@ namespace { bool applyPartitionFilter( const TypePtr& type, const std::string& partitionValue, + bool isPartitionDateDaysSinceEpoch, common::Filter* filter) { if (type->isDate()) { - const auto result = util::fromDateString( - StringView(partitionValue), util::ParseMode::kPrestoCast); - VELOX_CHECK(!result.hasError()); - return applyFilter(*filter, result.value()); + int32_t result = 0; + // days_since_epoch partition values are integers in string format. Eg. + // Iceberg partition values. + if (isPartitionDateDaysSinceEpoch) { + result = folly::to(partitionValue); + } else { + result = DATE()->toDays(static_cast(partitionValue)); + } + return applyFilter(*filter, result); } switch (type->kind()) { @@ -697,10 +703,15 @@ bool testFilters( const auto handlesIter = partitionKeysHandle.find(name); VELOX_CHECK(handlesIter != partitionKeysHandle.end()); + auto partitionDateValueFormat = + handlesIter->second->getColumnParameterValue( + dwio::common::ColumnParameter::kPartitionDateValueFormat); // This is a non-null partition key return applyPartitionFilter( handlesIter->second->dataType(), iter->second.value(), + partitionDateValueFormat.has_value() && + partitionDateValueFormat.value() == "days_since_epoch", child->filter()); } // Column is missing, most likely due to schema evolution. Or it's a diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 572e47360b19..8740833754a4 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -35,14 +35,22 @@ VectorPtr newConstantFromString( const std::optional& value, vector_size_t size, velox::memory::MemoryPool* pool, - const std::string& sessionTimezone) { + const std::string& sessionTimezone, + bool isPartitionDateDaysSinceEpoch = false) { using T = typename TypeTraits::NativeType; if (!value.has_value()) { return std::make_shared>(pool, size, true, type, T()); } if (type->isDate()) { - auto days = DATE()->toDays(static_cast(value.value())); + int32_t days = 0; + // For Iceberg, the date partition values are already in daysSinceEpoch + // form. + if (isPartitionDateDaysSinceEpoch) { + days = folly::to(value.value()); + } else { + days = DATE()->toDays(static_cast(value.value())); + } return std::make_shared>( pool, size, false, type, std::move(days)); } @@ -388,6 +396,8 @@ void SplitReader::setPartitionValue( "ColumnHandle is missing for partition key {}", partitionKey); auto type = it->second->dataType(); + auto partitionDataValueFormat = it->second->getColumnParameterValue( + dwio::common::ColumnParameter::kPartitionDateValueFormat); auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( newConstantFromString, type->kind(), @@ -395,7 +405,9 @@ void SplitReader::setPartitionValue( value, 1, connectorQueryCtx_->memoryPool(), - connectorQueryCtx_->sessionTimezone()); + connectorQueryCtx_->sessionTimezone(), + partitionDataValueFormat.has_value() && + partitionDataValueFormat.value() == "days_since_epoch"); spec->setConstantValue(constant); } diff --git a/velox/connectors/hive/TableHandle.h b/velox/connectors/hive/TableHandle.h index 14916f51a734..5584054646d6 100644 --- a/velox/connectors/hive/TableHandle.h +++ b/velox/connectors/hive/TableHandle.h @@ -45,12 +45,14 @@ class HiveColumnHandle : public ColumnHandle { ColumnType columnType, TypePtr dataType, TypePtr hiveType, - std::vector requiredSubfields = {}) + std::vector requiredSubfields = {}, + const std::unordered_map& columnParameters = {}) : name_(name), columnType_(columnType), dataType_(std::move(dataType)), hiveType_(std::move(hiveType)), - requiredSubfields_(std::move(requiredSubfields)) { + requiredSubfields_(std::move(requiredSubfields)), + columnParameters_(columnParameters) { VELOX_USER_CHECK( dataType_->equivalent(*hiveType_), "data type {} and hive type {} do not match", @@ -96,6 +98,18 @@ class HiveColumnHandle : public ColumnHandle { return columnType_ == ColumnType::kPartitionKey; } + const std::unordered_map& columnParameters() const { + return columnParameters_; + } + + const std::optional getColumnParameterValue( + const std::string& columnParameter) { + if (columnParameters_.count(columnParameter) > 0) { + return columnParameters_.at(columnParameter); + } + return {}; + } + std::string toString() const; folly::dynamic serialize() const override; @@ -115,6 +129,9 @@ class HiveColumnHandle : public ColumnHandle { const TypePtr dataType_; const TypePtr hiveType_; const std::vector requiredSubfields_; + // The column parameters are used for metadata like Iceberg date partition + // value format. + const std::unordered_map columnParameters_; }; class HiveTableHandle : public ConnectorTableHandle { diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 7416713868ad..517a58a29d41 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" @@ -225,8 +226,71 @@ class HiveIcebergTest : public HiveConnectorTestBase { ASSERT_TRUE(it->second.peakMemoryBytes > 0); } + void assertQuery( + RowTypePtr rowType, + const std::vector& dataVectors, + const std::string duckDbSql, + const std::unordered_map> + partitionKeys = {}, + const std::vector filters = {}) { + VELOX_CHECK(!duckDbSql.empty(), "DuckDb sql is empty"); + auto dataFilePath = TempFilePath::create(); + + writeToFile( + dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_); + std::vector> splits; + splits.emplace_back( + makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys)); + + std::unordered_set partitionColumns; + + for (auto partitionKey : partitionKeys) { + partitionColumns.insert(partitionKey.first); + } + + auto planBuilder = new PlanBuilder(pool_.get()); + auto plan = PlanBuilder::TableScanBuilder(*planBuilder) + .outputType(rowType) + .subfieldFilters(filters) + .partitionKeys(partitionColumns) + .tableFormat("iceberg") + .endTableScan() + .planNode(); + HiveConnectorTestBase::assertQuery(plan, splits, duckDbSql); + } + const static int rowCount = 20000; + protected: + std::shared_ptr config_; + std::function()> flushPolicyFactory_; + + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}, + const std::unordered_map> + partitionKeys = {}) { + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive-iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + /*cacheable=*/true, + deleteFiles); + } + private: std::map> writeDataFiles( std::map> rowGroupSizesForFiles) { @@ -335,31 +399,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { return vectors; } - std::shared_ptr makeIcebergSplit( - const std::string& dataFilePath, - const std::vector& deleteFiles = {}) { - std::unordered_map> partitionKeys; - std::unordered_map customSplitInfo; - customSplitInfo["table_format"] = "hive-iceberg"; - - auto file = filesystems::getFileSystem(dataFilePath, nullptr) - ->openFileForRead(dataFilePath); - const int64_t fileSize = file->size(); - - return std::make_shared( - kHiveConnectorId, - dataFilePath, - fileFomat_, - 0, - fileSize, - partitionKeys, - std::nullopt, - customSplitInfo, - nullptr, - /*cacheable=*/true, - deleteFiles); - } - std::string getDuckDBQuery( const std::map>& rowGroupSizesForFiles, const std::unordered_map< @@ -478,8 +517,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { } dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; - std::shared_ptr config_; - std::function()> flushPolicyFactory_; RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; std::shared_ptr pathColumn_ = @@ -660,4 +697,79 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) { assertMultipleSplits({}, 10, 3); } +TEST_F(HiveIcebergTest, testPartitionedRead) { + RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})}; + std::unordered_map> partitionKeys; + // Iceberg API sets partition values for dates to daysSinceEpoch, so + // in velox, we do not need to convert it to days. + // Date = 2018-04-06, daysSinceEpoch = 17627 + partitionKeys["ds"] = "17627"; + + // Test query on two partitions ds=17627, ds=17628 + std::vector> splits; + std::vector> dataFilePaths; + int32_t daysSinceEpoch; + for (int i = 0; i <= 1; ++i) { + std::vector dataVectors; + daysSinceEpoch = 17627 + i; + VectorPtr c0 = makeFlatVector((std::vector){i}); + VectorPtr ds = + makeFlatVector((std::vector){daysSinceEpoch}); + dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds})); + + auto dataFilePath = TempFilePath::create(); + dataFilePaths.push_back(dataFilePath); + writeToFile( + dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_); + partitionKeys["ds"] = std::to_string(daysSinceEpoch); + splits.emplace_back( + makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys)); + } + + std::unordered_set partitionColumns; + + for (auto partitionKey : partitionKeys) { + partitionColumns.insert(partitionKey.first); + } + + auto planBuilder = new PlanBuilder(pool_.get()); + auto plan = planBuilder->startTableScan() + .outputType(rowType) + .partitionKeys(partitionColumns) + .tableFormat("iceberg") + .endTableScan() + .planNode(); + HiveConnectorTestBase::assertQuery( + plan, + splits, + "SELECT * FROM (VALUES (0, '2018-04-06'), (1, '2018-04-07'))", + 0); + + // Filter on non-partitioned non-date column + std::vector nonPartitionFilters = {"c0 = 1"}; + plan = planBuilder->startTableScan() + .outputType(rowType) + .partitionKeys(partitionColumns) + .subfieldFilters(nonPartitionFilters) + .tableFormat("iceberg") + .endTableScan() + .planNode(); + HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 1, '2018-04-07'"); + + // Test filter on non-partitioned date column + std::vector filters = {"ds = date'2018-04-06'"}; + plan = planBuilder->startTableScan() + .outputType(rowType) + .subfieldFilters(filters) + .tableFormat("iceberg") + .endTableScan() + .planNode(); + splits.clear(); + for (auto dataFilePath : dataFilePaths) { + splits.emplace_back(makeIcebergSplit(dataFilePath->getPath(), {}, {})); + } + + HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 0, '2018-04-06'"); +} + } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 987e0eb76462..097e58578da3 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -110,6 +110,12 @@ struct TableParameter { "serialization.null.format"; }; +struct ColumnParameter { + // Describes the format of the date partition value. + static constexpr const char* kPartitionDateValueFormat = + "partition.date.value.format"; +}; + /// Implicit row number column to be added. This column will be removed in the /// output of split reader. Should use the ScanSpec::ColumnType::kRowIndex if /// the column is suppose to be explicit and kept in the output. diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 8739ae9cd965..5b04910c0ffb 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -74,7 +74,8 @@ PlanBuilder& PlanBuilder::tableScan( const RowTypePtr& dataColumns, const std::unordered_map< std::string, - std::shared_ptr>& assignments) { + std::shared_ptr>& assignments, + const std::unordered_set& partitionKeys) { return TableScanBuilder(*this) .outputType(outputType) .assignments(assignments) @@ -82,6 +83,7 @@ PlanBuilder& PlanBuilder::tableScan( .remainingFilter(remainingFilter) .dataColumns(dataColumns) .assignments(assignments) + .partitionKeys(partitionKeys) .endTableScan(); } @@ -170,13 +172,26 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) { } if (!hasAssignments) { + auto columnType = HiveColumnHandle::ColumnType::kRegular; + if (partitionKeys_.count(name) > 0) { + columnType = HiveColumnHandle::ColumnType::kPartitionKey; + } + std::unordered_map columnParameters; + if (tableFormat_ == "iceberg" && type->isDate()) { + columnParameters.insert( + {dwio::common::ColumnParameter::kPartitionDateValueFormat, + "days_since_epoch"}); + } + std::vector requiredSubFields; assignments_.insert( {name, std::make_shared( hiveColumnName, - HiveColumnHandle::ColumnType::kRegular, + columnType, + type, type, - type)}); + std::move(requiredSubFields), + columnParameters)}); } } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 0c4c5ee68b52..840de300ac58 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -131,6 +131,7 @@ class PlanBuilder { /// you define the output types only. See 'missingColumns' test in /// 'TableScanTest'. /// @param assignments Optional ColumnHandles. + /// @param partitionKeys Optional partition keys. PlanBuilder& tableScan( const RowTypePtr& outputType, const std::vector& subfieldFilters = {}, @@ -138,7 +139,8 @@ class PlanBuilder { const RowTypePtr& dataColumns = nullptr, const std::unordered_map< std::string, - std::shared_ptr>& assignments = {}); + std::shared_ptr>& assignments = {}, + const std::unordered_set& partitionKeys = {}); /// Add a TableScanNode to scan a Hive table. /// @@ -277,6 +279,17 @@ class PlanBuilder { return *this; } + TableScanBuilder& partitionKeys( + std::unordered_set partitionKeys) { + partitionKeys_ = std::move(partitionKeys); + return *this; + } + + TableScanBuilder& tableFormat(std::string tableFormat) { + tableFormat_ = tableFormat; + return *this; + } + /// Stop the TableScanBuilder. PlanBuilder& endTableScan() { planBuilder_.planNode_ = build(planBuilder_.nextPlanNodeId()); @@ -298,6 +311,8 @@ class PlanBuilder { std::shared_ptr tableHandle_; std::unordered_map> assignments_; + std::unordered_set partitionKeys_; + std::string tableFormat_; }; /// Start a TableScanBuilder.