From 3804e102fcc99b5a8791ec05c4522715b53da721 Mon Sep 17 00:00:00 2001 From: Mahadevuni Naveen Kumar Date: Fri, 7 Feb 2025 23:26:47 +0530 Subject: [PATCH] fix(iceberg): Date partition value parse issue --- velox/connectors/hive/HiveConnectorUtil.cpp | 19 +- velox/connectors/hive/SplitReader.cpp | 18 +- velox/connectors/hive/TableHandle.cpp | 41 ++++- velox/connectors/hive/TableHandle.h | 21 ++- .../hive/iceberg/tests/IcebergReadTest.cpp | 163 +++++++++++++++--- .../connectors/hive/tests/TableHandleTest.cpp | 9 +- velox/dwio/common/Options.h | 6 + .../tests/utils/HiveConnectorTestBase.cpp | 10 +- .../exec/tests/utils/HiveConnectorTestBase.h | 4 +- velox/exec/tests/utils/PlanBuilder.cpp | 21 ++- velox/exec/tests/utils/PlanBuilder.h | 17 +- 11 files changed, 283 insertions(+), 46 deletions(-) diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 2525064cee85..88bb3a5a3afb 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -634,12 +634,18 @@ namespace { bool applyPartitionFilter( const TypePtr& type, const std::string& partitionValue, + bool isPartitionDateDaysSinceEpoch, common::Filter* filter) { if (type->isDate()) { - const auto result = util::fromDateString( - StringView(partitionValue), util::ParseMode::kPrestoCast); - VELOX_CHECK(!result.hasError()); - return applyFilter(*filter, result.value()); + int32_t result = 0; + // days_since_epoch partition values are integers in string format. Eg. + // Iceberg partition values. + if (isPartitionDateDaysSinceEpoch) { + result = folly::to(partitionValue); + } else { + result = DATE()->toDays(static_cast(partitionValue)); + } + return applyFilter(*filter, result); } switch (type->kind()) { @@ -697,10 +703,15 @@ bool testFilters( const auto handlesIter = partitionKeysHandle.find(name); VELOX_CHECK(handlesIter != partitionKeysHandle.end()); + auto partitionDateValueFormat = + handlesIter->second->getColumnParameterValue( + dwio::common::ColumnParameter::kPartitionDateValueFormat); // This is a non-null partition key return applyPartitionFilter( handlesIter->second->dataType(), iter->second.value(), + partitionDateValueFormat.has_value() && + partitionDateValueFormat.value() == "days_since_epoch", child->filter()); } // Column is missing, most likely due to schema evolution. Or it's a diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index fd30a77558c6..b864c0b4469c 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -36,14 +36,22 @@ VectorPtr newConstantFromString( vector_size_t size, velox::memory::MemoryPool* pool, const std::string& sessionTimezone, - bool asLocalTime) { + bool asLocalTime, + bool isPartitionDateDaysSinceEpoch = false) { using T = typename TypeTraits::NativeType; if (!value.has_value()) { return std::make_shared>(pool, size, true, type, T()); } if (type->isDate()) { - auto days = DATE()->toDays(static_cast(value.value())); + int32_t days = 0; + // For Iceberg, the date partition values are already in daysSinceEpoch + // form. + if (isPartitionDateDaysSinceEpoch) { + days = folly::to(value.value()); + } else { + days = DATE()->toDays(static_cast(value.value())); + } return std::make_shared>( pool, size, false, type, std::move(days)); } @@ -393,6 +401,8 @@ void SplitReader::setPartitionValue( "ColumnHandle is missing for partition key {}", partitionKey); auto type = it->second->dataType(); + auto partitionDataValueFormat = it->second->getColumnParameterValue( + dwio::common::ColumnParameter::kPartitionDateValueFormat); auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( newConstantFromString, type->kind(), @@ -402,7 +412,9 @@ void SplitReader::setPartitionValue( connectorQueryCtx_->memoryPool(), connectorQueryCtx_->sessionTimezone(), hiveConfig_->readTimestampPartitionValueAsLocalTime( - connectorQueryCtx_->sessionProperties())); + connectorQueryCtx_->sessionProperties()), + partitionDataValueFormat.has_value() && + partitionDataValueFormat.value() == "days_since_epoch"); spec->setConstantValue(constant); } diff --git a/velox/connectors/hive/TableHandle.cpp b/velox/connectors/hive/TableHandle.cpp index 9e03adf47022..2062111d1638 100644 --- a/velox/connectors/hive/TableHandle.cpp +++ b/velox/connectors/hive/TableHandle.cpp @@ -63,6 +63,16 @@ folly::dynamic HiveColumnHandle::serialize() const { requiredSubfields.push_back(subfield.toString()); } obj["requiredSubfields"] = requiredSubfields; + + folly::dynamic columnParameters = folly::dynamic::array; + for (const auto& [key, value] : columnParameters_) { + folly::dynamic pair = folly::dynamic::object; + pair["key"] = key; + pair["value"] = value; + columnParameters.push_back(pair); + } + obj["columnParameters"] = columnParameters; + return obj; } @@ -77,7 +87,22 @@ std::string HiveColumnHandle::toString() const { for (const auto& subfield : requiredSubfields_) { out << " " << subfield.toString(); } - out << " ]]"; + out << " ]"; + + if (!columnParameters_.empty()) { + out << ", columnParameters: ["; + bool printSeparator = false; + for (const auto& [key, value] : columnParameters_) { + if (printSeparator) { + out << ", "; + } + out << "(" << key << ", " << value << ")"; + printSeparator = true; + } + out << "]"; + } + out << "]"; + return out.str(); } @@ -94,8 +119,20 @@ ColumnHandlePtr HiveColumnHandle::create(const folly::dynamic& obj) { requiredSubfields.emplace_back(s.asString()); } + std::unordered_map columnParameters; + folly::dynamic columnParametersObj = obj["columnParameters"]; + for (const auto& columnParameter : columnParametersObj) { + columnParameters[columnParameter["key"].asString()] = + columnParameter["value"].asString(); + } + return std::make_shared( - name, columnType, dataType, hiveType, std::move(requiredSubfields)); + name, + columnType, + dataType, + hiveType, + std::move(requiredSubfields), + columnParameters); } void HiveColumnHandle::registerSerDe() { diff --git a/velox/connectors/hive/TableHandle.h b/velox/connectors/hive/TableHandle.h index 14916f51a734..5584054646d6 100644 --- a/velox/connectors/hive/TableHandle.h +++ b/velox/connectors/hive/TableHandle.h @@ -45,12 +45,14 @@ class HiveColumnHandle : public ColumnHandle { ColumnType columnType, TypePtr dataType, TypePtr hiveType, - std::vector requiredSubfields = {}) + std::vector requiredSubfields = {}, + const std::unordered_map& columnParameters = {}) : name_(name), columnType_(columnType), dataType_(std::move(dataType)), hiveType_(std::move(hiveType)), - requiredSubfields_(std::move(requiredSubfields)) { + requiredSubfields_(std::move(requiredSubfields)), + columnParameters_(columnParameters) { VELOX_USER_CHECK( dataType_->equivalent(*hiveType_), "data type {} and hive type {} do not match", @@ -96,6 +98,18 @@ class HiveColumnHandle : public ColumnHandle { return columnType_ == ColumnType::kPartitionKey; } + const std::unordered_map& columnParameters() const { + return columnParameters_; + } + + const std::optional getColumnParameterValue( + const std::string& columnParameter) { + if (columnParameters_.count(columnParameter) > 0) { + return columnParameters_.at(columnParameter); + } + return {}; + } + std::string toString() const; folly::dynamic serialize() const override; @@ -115,6 +129,9 @@ class HiveColumnHandle : public ColumnHandle { const TypePtr dataType_; const TypePtr hiveType_; const std::vector requiredSubfields_; + // The column parameters are used for metadata like Iceberg date partition + // value format. + const std::unordered_map columnParameters_; }; class HiveTableHandle : public ConnectorTableHandle { diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 7416713868ad..80df31d9218a 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" @@ -225,8 +226,71 @@ class HiveIcebergTest : public HiveConnectorTestBase { ASSERT_TRUE(it->second.peakMemoryBytes > 0); } + void assertQuery( + RowTypePtr rowType, + const std::vector& dataVectors, + const std::string duckDbSql, + const std::unordered_map> + partitionKeys = {}, + const std::vector filters = {}) { + VELOX_CHECK(!duckDbSql.empty(), "DuckDb sql is empty"); + auto dataFilePath = TempFilePath::create(); + + writeToFile( + dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_); + std::vector> splits; + splits.emplace_back( + makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys)); + + std::unordered_set partitionColumns; + + for (auto partitionKey : partitionKeys) { + partitionColumns.insert(partitionKey.first); + } + + auto planBuilder = new PlanBuilder(pool_.get()); + auto plan = PlanBuilder::TableScanBuilder(*planBuilder) + .outputType(rowType) + .subfieldFilters(filters) + .partitionKeys(partitionColumns) + .tableFormat("iceberg") + .endTableScan() + .planNode(); + HiveConnectorTestBase::assertQuery(plan, splits, duckDbSql); + } + const static int rowCount = 20000; + protected: + std::shared_ptr config_; + std::function()> flushPolicyFactory_; + + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}, + const std::unordered_map> + partitionKeys = {}) { + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive-iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + /*cacheable=*/true, + deleteFiles); + } + private: std::map> writeDataFiles( std::map> rowGroupSizesForFiles) { @@ -335,31 +399,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { return vectors; } - std::shared_ptr makeIcebergSplit( - const std::string& dataFilePath, - const std::vector& deleteFiles = {}) { - std::unordered_map> partitionKeys; - std::unordered_map customSplitInfo; - customSplitInfo["table_format"] = "hive-iceberg"; - - auto file = filesystems::getFileSystem(dataFilePath, nullptr) - ->openFileForRead(dataFilePath); - const int64_t fileSize = file->size(); - - return std::make_shared( - kHiveConnectorId, - dataFilePath, - fileFomat_, - 0, - fileSize, - partitionKeys, - std::nullopt, - customSplitInfo, - nullptr, - /*cacheable=*/true, - deleteFiles); - } - std::string getDuckDBQuery( const std::map>& rowGroupSizesForFiles, const std::unordered_map< @@ -478,8 +517,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { } dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; - std::shared_ptr config_; - std::function()> flushPolicyFactory_; RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; std::shared_ptr pathColumn_ = @@ -660,4 +697,76 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) { assertMultipleSplits({}, 10, 3); } +TEST_F(HiveIcebergTest, testPartitionedRead) { + RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})}; + std::unordered_map> partitionKeys; + // Iceberg API sets partition values for dates to daysSinceEpoch, so + // in velox, we do not need to convert it to days. + // Test query on two partitions ds=17627(2018-04-06), ds=17628(2018-04-07) + std::vector> splits; + std::vector> dataFilePaths; + int32_t daysSinceEpoch; + for (int i = 0; i <= 1; ++i) { + std::vector dataVectors; + daysSinceEpoch = 17627 + i; + VectorPtr c0 = makeFlatVector((std::vector){i}); + VectorPtr ds = + makeFlatVector((std::vector){daysSinceEpoch}); + dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds})); + + auto dataFilePath = TempFilePath::create(); + dataFilePaths.push_back(dataFilePath); + writeToFile( + dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_); + partitionKeys["ds"] = std::to_string(daysSinceEpoch); + splits.emplace_back( + makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys)); + } + + std::unordered_set partitionColumns; + + for (auto partitionKey : partitionKeys) { + partitionColumns.insert(partitionKey.first); + } + + auto planBuilder = new PlanBuilder(pool_.get()); + auto plan = planBuilder->startTableScan() + .outputType(rowType) + .partitionKeys(partitionColumns) + .tableFormat("iceberg") + .endTableScan() + .planNode(); + HiveConnectorTestBase::assertQuery( + plan, + splits, + "SELECT * FROM (VALUES (0, '2018-04-06'), (1, '2018-04-07'))", + 0); + + // Filter on non-partitioned non-date column + std::vector nonPartitionFilters = {"c0 = 1"}; + plan = planBuilder->startTableScan() + .outputType(rowType) + .partitionKeys(partitionColumns) + .subfieldFilters(nonPartitionFilters) + .tableFormat("iceberg") + .endTableScan() + .planNode(); + HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 1, '2018-04-07'"); + + // Test filter on non-partitioned date column + std::vector filters = {"ds = date'2018-04-06'"}; + plan = planBuilder->startTableScan() + .outputType(rowType) + .subfieldFilters(filters) + .tableFormat("iceberg") + .endTableScan() + .planNode(); + splits.clear(); + for (auto dataFilePath : dataFilePaths) { + splits.emplace_back(makeIcebergSplit(dataFilePath->getPath(), {}, {})); + } + + HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 0, '2018-04-06'"); +} + } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/tests/TableHandleTest.cpp b/velox/connectors/hive/tests/TableHandleTest.cpp index 53cb95a27221..e1634eecaedc 100644 --- a/velox/connectors/hive/tests/TableHandleTest.cpp +++ b/velox/connectors/hive/tests/TableHandleTest.cpp @@ -30,8 +30,15 @@ TEST(FileHandleTest, hiveColumnHandle) { {"c0c1", ARRAY(MAP( VARCHAR(), ROW({{"c0c1c0", BIGINT()}, {"c0c1c1", BIGINT()}})))}}); + std::unordered_map columnParameters; + columnParameters["some.key"] = "some_value"; auto columnHandle = exec::test::HiveConnectorTestBase::makeColumnHandle( - "columnHandle", columnType, columnType, {"c0.c0c1[3][\"foo\"].c0c1c0"}); + "columnHandle", + columnType, + columnType, + {"c0.c0c1[3][\"foo\"].c0c1c0"}, + connector::hive::HiveColumnHandle::ColumnType::kRegular, + columnParameters); ASSERT_EQ(columnHandle->name(), "columnHandle"); ASSERT_EQ( columnHandle->columnType(), diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 987e0eb76462..097e58578da3 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -110,6 +110,12 @@ struct TableParameter { "serialization.null.format"; }; +struct ColumnParameter { + // Describes the format of the date partition value. + static constexpr const char* kPartitionDateValueFormat = + "partition.date.value.format"; +}; + /// Implicit row number column to be added. This column will be removed in the /// output of split reader. Should use the ScanSpec::ColumnType::kRowIndex if /// the column is suppose to be explicit and kept in the output. diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index bf17dd788d5e..1fb757e936dd 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -219,7 +219,8 @@ HiveConnectorTestBase::makeColumnHandle( const TypePtr& dataType, const TypePtr& hiveType, const std::vector& requiredSubfields, - connector::hive::HiveColumnHandle::ColumnType columnType) { + connector::hive::HiveColumnHandle::ColumnType columnType, + const std::unordered_map& columnParameters) { std::vector subfields; subfields.reserve(requiredSubfields.size()); for (auto& path : requiredSubfields) { @@ -227,7 +228,12 @@ HiveConnectorTestBase::makeColumnHandle( } return std::make_unique( - name, columnType, dataType, hiveType, std::move(subfields)); + name, + columnType, + dataType, + hiveType, + std::move(subfields), + columnParameters); } std::vector> diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 98ca4b803757..3056b0aa7fcd 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -148,7 +148,9 @@ class HiveConnectorTestBase : public OperatorTestBase { const TypePtr& hiveType, const std::vector& requiredSubfields, connector::hive::HiveColumnHandle::ColumnType columnType = - connector::hive::HiveColumnHandle::ColumnType::kRegular); + connector::hive::HiveColumnHandle::ColumnType::kRegular, + const std::unordered_map& columnParameters = + {}); /// @param targetDirectory Final directory of the target table after commit. /// @param writeDirectory Write directory of the target table before commit. diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 8739ae9cd965..5b04910c0ffb 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -74,7 +74,8 @@ PlanBuilder& PlanBuilder::tableScan( const RowTypePtr& dataColumns, const std::unordered_map< std::string, - std::shared_ptr>& assignments) { + std::shared_ptr>& assignments, + const std::unordered_set& partitionKeys) { return TableScanBuilder(*this) .outputType(outputType) .assignments(assignments) @@ -82,6 +83,7 @@ PlanBuilder& PlanBuilder::tableScan( .remainingFilter(remainingFilter) .dataColumns(dataColumns) .assignments(assignments) + .partitionKeys(partitionKeys) .endTableScan(); } @@ -170,13 +172,26 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) { } if (!hasAssignments) { + auto columnType = HiveColumnHandle::ColumnType::kRegular; + if (partitionKeys_.count(name) > 0) { + columnType = HiveColumnHandle::ColumnType::kPartitionKey; + } + std::unordered_map columnParameters; + if (tableFormat_ == "iceberg" && type->isDate()) { + columnParameters.insert( + {dwio::common::ColumnParameter::kPartitionDateValueFormat, + "days_since_epoch"}); + } + std::vector requiredSubFields; assignments_.insert( {name, std::make_shared( hiveColumnName, - HiveColumnHandle::ColumnType::kRegular, + columnType, + type, type, - type)}); + std::move(requiredSubFields), + columnParameters)}); } } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 0c4c5ee68b52..840de300ac58 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -131,6 +131,7 @@ class PlanBuilder { /// you define the output types only. See 'missingColumns' test in /// 'TableScanTest'. /// @param assignments Optional ColumnHandles. + /// @param partitionKeys Optional partition keys. PlanBuilder& tableScan( const RowTypePtr& outputType, const std::vector& subfieldFilters = {}, @@ -138,7 +139,8 @@ class PlanBuilder { const RowTypePtr& dataColumns = nullptr, const std::unordered_map< std::string, - std::shared_ptr>& assignments = {}); + std::shared_ptr>& assignments = {}, + const std::unordered_set& partitionKeys = {}); /// Add a TableScanNode to scan a Hive table. /// @@ -277,6 +279,17 @@ class PlanBuilder { return *this; } + TableScanBuilder& partitionKeys( + std::unordered_set partitionKeys) { + partitionKeys_ = std::move(partitionKeys); + return *this; + } + + TableScanBuilder& tableFormat(std::string tableFormat) { + tableFormat_ = tableFormat; + return *this; + } + /// Stop the TableScanBuilder. PlanBuilder& endTableScan() { planBuilder_.planNode_ = build(planBuilder_.nextPlanNodeId()); @@ -298,6 +311,8 @@ class PlanBuilder { std::shared_ptr tableHandle_; std::unordered_map> assignments_; + std::unordered_set partitionKeys_; + std::string tableFormat_; }; /// Start a TableScanBuilder.