Skip to content

Commit

Permalink
fix(iceberg): Date partition value parse issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nmahadevuni committed Feb 5, 2025
1 parent ce273fa commit 2f5fa0a
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 15 deletions.
15 changes: 11 additions & 4 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,18 @@ namespace {
bool applyPartitionFilter(
const TypePtr& type,
const std::string& partitionValue,
HiveColumnHandle::ValueType valueType,
common::Filter* filter) {
if (type->isDate()) {
const auto result = util::fromDateString(
StringView(partitionValue), util::ParseMode::kPrestoCast);
VELOX_CHECK(!result.hasError());
return applyFilter(*filter, result.value());
int32_t result = 0;
// Iceberg partition values are already in daysSinceEpoch, no need to
// convert.
if (valueType == HiveColumnHandle::ValueType::kDaysSinceEpoch) {
result = folly::to<int32_t>(partitionValue);
} else {
result = DATE()->toDays(static_cast<folly::StringPiece>(partitionValue));
}
return applyFilter(*filter, result);
}

switch (type->kind()) {
Expand Down Expand Up @@ -701,6 +707,7 @@ bool testFilters(
return applyPartitionFilter(
handlesIter->second->dataType(),
iter->second.value(),
handlesIter->second->valueType(),
child->filter());
}
// Column is missing, most likely due to schema evolution. Or it's a
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/connectors/hive/SplitReader.h"
#include "velox/dwio/common/BufferedInput.h"
#include "velox/dwio/common/Reader.h"

Expand Down
16 changes: 13 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ VectorPtr newConstantFromString(
const std::optional<std::string>& value,
vector_size_t size,
velox::memory::MemoryPool* pool,
const std::string& sessionTimezone) {
const std::string& sessionTimezone,
HiveColumnHandle::ValueType valueType =
HiveColumnHandle::ValueType::kDefault) {
using T = typename TypeTraits<kind>::NativeType;
if (!value.has_value()) {
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
}

if (type->isDate()) {
auto days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
int32_t days = 0;
// For Iceberg, the date partition values are already in daysSinceEpoch
// form.
if (valueType == HiveColumnHandle::ValueType::kDaysSinceEpoch) {
days = folly::to<int32_t>(value.value());
} else {
days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
}
return std::make_shared<ConstantVector<int32_t>>(
pool, size, false, type, std::move(days));
}
Expand Down Expand Up @@ -389,7 +398,8 @@ void SplitReader::setPartitionValue(
value,
1,
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
connectorQueryCtx_->sessionTimezone(),
it->second->valueType());
spec->setConstantValue(constant);
}

Expand Down
13 changes: 11 additions & 2 deletions velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class HiveColumnHandle : public ColumnHandle {
kRowId,
};

enum class ValueType { kDefault, kDaysSinceEpoch };

/// NOTE: 'dataType' is the column type in target write table. 'hiveType' is
/// converted type of the corresponding column in source table which might not
/// be the same type, and the table scan needs to do data coercion if needs.
Expand All @@ -45,12 +47,14 @@ class HiveColumnHandle : public ColumnHandle {
ColumnType columnType,
TypePtr dataType,
TypePtr hiveType,
std::vector<common::Subfield> requiredSubfields = {})
std::vector<common::Subfield> requiredSubfields = {},
ValueType valueType = ValueType::kDefault)
: name_(name),
columnType_(columnType),
dataType_(std::move(dataType)),
hiveType_(std::move(hiveType)),
requiredSubfields_(std::move(requiredSubfields)) {
requiredSubfields_(std::move(requiredSubfields)),
valueType_(valueType) {
VELOX_USER_CHECK(
dataType_->equivalent(*hiveType_),
"data type {} and hive type {} do not match",
Expand Down Expand Up @@ -96,6 +100,10 @@ class HiveColumnHandle : public ColumnHandle {
return columnType_ == ColumnType::kPartitionKey;
}

const ValueType valueType() {
return valueType_;
}

std::string toString() const;

folly::dynamic serialize() const override;
Expand All @@ -115,6 +123,7 @@ class HiveColumnHandle : public ColumnHandle {
const TypePtr dataType_;
const TypePtr hiveType_;
const std::vector<common::Subfield> requiredSubfields_;
const ValueType valueType_;
};

class HiveTableHandle : public ConnectorTableHandle {
Expand Down
65 changes: 63 additions & 2 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
Expand Down Expand Up @@ -225,6 +226,44 @@ class HiveIcebergTest : public HiveConnectorTestBase {
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
}

void assertQuery(
RowTypePtr rowType,
const std::vector<RowVectorPtr>& dataVectors,
const std::string duckDbSql,
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {},
const std::vector<std::string> filters = {}) {
VELOX_CHECK(!duckDbSql.empty(), "DuckDb sql is empty");
auto dataFilePath = TempFilePath::create();

writeToFile(
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
std::vector<std::shared_ptr<ConnectorSplit>> splits;
splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));

std::unordered_set<std::string> partitionColumns;

for (auto partitionKey : partitionKeys) {
partitionColumns.insert(partitionKey.first);
}

// auto plan =
// PlanBuilder(pool_.get())
// .tableScan(rowType, filters, "", nullptr, {},
// partitionColumns) .planNode();

auto planBuilder = new PlanBuilder(pool_.get());
auto plan = PlanBuilder::TableScanBuilder(*planBuilder)
.outputType(rowType)
.subfieldFilters(filters)
.partitionKeys(partitionColumns)
.tableFormat("iceberg")
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(plan, splits, duckDbSql);
}

const static int rowCount = 20000;

private:
Expand Down Expand Up @@ -337,8 +376,9 @@ class HiveIcebergTest : public HiveConnectorTestBase {

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
const std::vector<IcebergDeleteFile>& deleteFiles = {},
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {}) {
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

Expand Down Expand Up @@ -660,4 +700,25 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
assertMultipleSplits({}, 10, 3);
}

TEST_F(HiveIcebergTest, testPartitionedRead) {
RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
// Iceberg API sets partition values for dates to daysSinceEpoch, so
// in velox, we do not need to convert it to days.
// Date = 2018-04-06, daysSinceEpoch = 17627
partitionKeys["ds"] = "17627";

std::vector<RowVectorPtr> dataVectors;
VectorPtr c0 = makeFlatVector<int64_t>((std::vector<int64_t>){1});
VectorPtr ds = makeFlatVector<int32_t>((std::vector<int32_t>){17627});
dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds}));

assertQuery(
rowType, dataVectors, "SELECT 1, '2018-04-06'", partitionKeys, {});

std::vector<std::string> filters = {"ds = date'2018-04-06'"};
assertQuery(
rowType, dataVectors, "SELECT 1, '2018-04-06'", partitionKeys, filters);
}

} // namespace facebook::velox::connector::hive::iceberg
19 changes: 16 additions & 3 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ PlanBuilder& PlanBuilder::tableScan(
const RowTypePtr& dataColumns,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& assignments) {
std::shared_ptr<connector::ColumnHandle>>& assignments,
const std::unordered_set<std::string>& partitionKeys) {
return TableScanBuilder(*this)
.outputType(outputType)
.assignments(assignments)
.subfieldFilters(subfieldFilters)
.remainingFilter(remainingFilter)
.dataColumns(dataColumns)
.assignments(assignments)
.partitionKeys(partitionKeys)
.endTableScan();
}

Expand Down Expand Up @@ -170,13 +172,24 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
}

if (!hasAssignments) {
auto columnType = HiveColumnHandle::ColumnType::kRegular;
if (partitionKeys_.count(name) > 0) {
columnType = HiveColumnHandle::ColumnType::kPartitionKey;
}
auto valueType = HiveColumnHandle::ValueType::kDefault;
if (tableFormat_ == "iceberg" && type->isDate()) {
valueType = HiveColumnHandle::ValueType::kDaysSinceEpoch;
}
std::vector<common::Subfield> requiredSubFields;
assignments_.insert(
{name,
std::make_shared<HiveColumnHandle>(
hiveColumnName,
HiveColumnHandle::ColumnType::kRegular,
columnType,
type,
type,
type)});
std::move(requiredSubFields),
valueType)});
}
}

Expand Down
17 changes: 16 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,16 @@ class PlanBuilder {
/// you define the output types only. See 'missingColumns' test in
/// 'TableScanTest'.
/// @param assignments Optional ColumnHandles.
/// @param partitionKeys Optional partition keys.
PlanBuilder& tableScan(
const RowTypePtr& outputType,
const std::vector<std::string>& subfieldFilters = {},
const std::string& remainingFilter = "",
const RowTypePtr& dataColumns = nullptr,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& assignments = {});
std::shared_ptr<connector::ColumnHandle>>& assignments = {},
const std::unordered_set<std::string>& partitionKeys = {});

/// Add a TableScanNode to scan a Hive table.
///
Expand Down Expand Up @@ -277,6 +279,17 @@ class PlanBuilder {
return *this;
}

TableScanBuilder& partitionKeys(
std::unordered_set<std::string> partitionKeys) {
partitionKeys_ = std::move(partitionKeys);
return *this;
}

TableScanBuilder& tableFormat(std::string tableFormat) {
tableFormat_ = tableFormat;
return *this;
}

/// Stop the TableScanBuilder.
PlanBuilder& endTableScan() {
planBuilder_.planNode_ = build(planBuilder_.nextPlanNodeId());
Expand All @@ -298,6 +311,8 @@ class PlanBuilder {
std::shared_ptr<connector::ConnectorTableHandle> tableHandle_;
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignments_;
std::unordered_set<std::string> partitionKeys_;
std::string tableFormat_;
};

/// Start a TableScanBuilder.
Expand Down

0 comments on commit 2f5fa0a

Please sign in to comment.