Skip to content

Commit

Permalink
fix(iceberg): Date partition value parse issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nmahadevuni committed Feb 11, 2025
1 parent 04bfdff commit 9e5d6d2
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 40 deletions.
19 changes: 15 additions & 4 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,18 @@ namespace {
bool applyPartitionFilter(
const TypePtr& type,
const std::string& partitionValue,
bool isPartitionDateDaysSinceEpoch,
common::Filter* filter) {
if (type->isDate()) {
const auto result = util::fromDateString(
StringView(partitionValue), util::ParseMode::kPrestoCast);
VELOX_CHECK(!result.hasError());
return applyFilter(*filter, result.value());
int32_t result = 0;
// days_since_epoch partition values are integers in string format. Eg.
// Iceberg partition values.
if (isPartitionDateDaysSinceEpoch) {
result = folly::to<int32_t>(partitionValue);
} else {
result = DATE()->toDays(static_cast<folly::StringPiece>(partitionValue));
}
return applyFilter(*filter, result);
}

switch (type->kind()) {
Expand Down Expand Up @@ -697,10 +703,15 @@ bool testFilters(
const auto handlesIter = partitionKeysHandle.find(name);
VELOX_CHECK(handlesIter != partitionKeysHandle.end());

auto partitionDateValueFormat =
handlesIter->second->getColumnParameterValue(
dwio::common::ColumnParameter::kPartitionDateValueFormat);
// This is a non-null partition key
return applyPartitionFilter(
handlesIter->second->dataType(),
iter->second.value(),
partitionDateValueFormat.has_value() &&
partitionDateValueFormat.value() == "days_since_epoch",
child->filter());
}
// Column is missing, most likely due to schema evolution. Or it's a
Expand Down
18 changes: 15 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,22 @@ VectorPtr newConstantFromString(
const std::optional<std::string>& value,
vector_size_t size,
velox::memory::MemoryPool* pool,
const std::string& sessionTimezone) {
const std::string& sessionTimezone,
bool isPartitionDateDaysSinceEpoch = false) {
using T = typename TypeTraits<kind>::NativeType;
if (!value.has_value()) {
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
}

if (type->isDate()) {
auto days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
int32_t days = 0;
// For Iceberg, the date partition values are already in daysSinceEpoch
// form.
if (isPartitionDateDaysSinceEpoch) {
days = folly::to<int32_t>(value.value());
} else {
days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
}
return std::make_shared<ConstantVector<int32_t>>(
pool, size, false, type, std::move(days));
}
Expand Down Expand Up @@ -388,14 +396,18 @@ void SplitReader::setPartitionValue(
"ColumnHandle is missing for partition key {}",
partitionKey);
auto type = it->second->dataType();
auto partitionDataValueFormat = it->second->getColumnParameterValue(
dwio::common::ColumnParameter::kPartitionDateValueFormat);
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
type->kind(),
type,
value,
1,
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
connectorQueryCtx_->sessionTimezone(),
partitionDataValueFormat.has_value() &&
partitionDataValueFormat.value() == "days_since_epoch");
spec->setConstantValue(constant);
}

Expand Down
21 changes: 19 additions & 2 deletions velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ class HiveColumnHandle : public ColumnHandle {
ColumnType columnType,
TypePtr dataType,
TypePtr hiveType,
std::vector<common::Subfield> requiredSubfields = {})
std::vector<common::Subfield> requiredSubfields = {},
const std::unordered_map<std::string, std::string>& columnParameters = {})
: name_(name),
columnType_(columnType),
dataType_(std::move(dataType)),
hiveType_(std::move(hiveType)),
requiredSubfields_(std::move(requiredSubfields)) {
requiredSubfields_(std::move(requiredSubfields)),
columnParameters_(columnParameters) {
VELOX_USER_CHECK(
dataType_->equivalent(*hiveType_),
"data type {} and hive type {} do not match",
Expand Down Expand Up @@ -96,6 +98,18 @@ class HiveColumnHandle : public ColumnHandle {
return columnType_ == ColumnType::kPartitionKey;
}

const std::unordered_map<std::string, std::string>& columnParameters() const {
return columnParameters_;
}

const std::optional<std::string> getColumnParameterValue(
const std::string& columnParameter) {
if (columnParameters_.count(columnParameter) > 0) {
return columnParameters_.at(columnParameter);
}
return {};
}

std::string toString() const;

folly::dynamic serialize() const override;
Expand All @@ -115,6 +129,9 @@ class HiveColumnHandle : public ColumnHandle {
const TypePtr dataType_;
const TypePtr hiveType_;
const std::vector<common::Subfield> requiredSubfields_;
// The column parameters are used for metadata like Iceberg date partition
// value format.
const std::unordered_map<std::string, std::string> columnParameters_;
};

class HiveTableHandle : public ConnectorTableHandle {
Expand Down
166 changes: 139 additions & 27 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
Expand Down Expand Up @@ -225,8 +226,71 @@ class HiveIcebergTest : public HiveConnectorTestBase {
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
}

void assertQuery(
RowTypePtr rowType,
const std::vector<RowVectorPtr>& dataVectors,
const std::string duckDbSql,
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {},
const std::vector<std::string> filters = {}) {
VELOX_CHECK(!duckDbSql.empty(), "DuckDb sql is empty");
auto dataFilePath = TempFilePath::create();

writeToFile(
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
std::vector<std::shared_ptr<ConnectorSplit>> splits;
splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));

std::unordered_set<std::string> partitionColumns;

for (auto partitionKey : partitionKeys) {
partitionColumns.insert(partitionKey.first);
}

auto planBuilder = new PlanBuilder(pool_.get());
auto plan = PlanBuilder::TableScanBuilder(*planBuilder)
.outputType(rowType)
.subfieldFilters(filters)
.partitionKeys(partitionColumns)
.tableFormat("iceberg")
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(plan, splits, duckDbSql);
}

const static int rowCount = 20000;

protected:
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {},
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {}) {
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

auto file = filesystems::getFileSystem(dataFilePath, nullptr)
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
/*cacheable=*/true,
deleteFiles);
}

private:
std::map<std::string, std::shared_ptr<TempFilePath>> writeDataFiles(
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles) {
Expand Down Expand Up @@ -335,31 +399,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return vectors;
}

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

auto file = filesystems::getFileSystem(dataFilePath, nullptr)
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
/*cacheable=*/true,
deleteFiles);
}

std::string getDuckDBQuery(
const std::map<std::string, std::vector<int64_t>>& rowGroupSizesForFiles,
const std::unordered_map<
Expand Down Expand Up @@ -478,8 +517,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
}

dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF};
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})};
std::shared_ptr<IcebergMetadataColumn> pathColumn_ =
Expand Down Expand Up @@ -660,4 +697,79 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
assertMultipleSplits({}, 10, 3);
}

TEST_F(HiveIcebergTest, testPartitionedRead) {
RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
// Iceberg API sets partition values for dates to daysSinceEpoch, so
// in velox, we do not need to convert it to days.
// Date = 2018-04-06, daysSinceEpoch = 17627
partitionKeys["ds"] = "17627";

// Test query on two partitions ds=17627, ds=17628
std::vector<std::shared_ptr<ConnectorSplit>> splits;
std::vector<std::shared_ptr<TempFilePath>> dataFilePaths;
int32_t daysSinceEpoch;
for (int i = 0; i <= 1; ++i) {
std::vector<RowVectorPtr> dataVectors;
daysSinceEpoch = 17627 + i;
VectorPtr c0 = makeFlatVector<int64_t>((std::vector<int64_t>){i});
VectorPtr ds =
makeFlatVector<int32_t>((std::vector<int32_t>){daysSinceEpoch});
dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds}));

auto dataFilePath = TempFilePath::create();
dataFilePaths.push_back(dataFilePath);
writeToFile(
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
partitionKeys["ds"] = std::to_string(daysSinceEpoch);
splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));
}

std::unordered_set<std::string> partitionColumns;

for (auto partitionKey : partitionKeys) {
partitionColumns.insert(partitionKey.first);
}

auto planBuilder = new PlanBuilder(pool_.get());
auto plan = planBuilder->startTableScan()
.outputType(rowType)
.partitionKeys(partitionColumns)
.tableFormat("iceberg")
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(
plan,
splits,
"SELECT * FROM (VALUES (0, '2018-04-06'), (1, '2018-04-07'))",
0);

// Filter on non-partitioned non-date column
std::vector<std::string> nonPartitionFilters = {"c0 = 1"};
plan = planBuilder->startTableScan()
.outputType(rowType)
.partitionKeys(partitionColumns)
.subfieldFilters(nonPartitionFilters)
.tableFormat("iceberg")
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 1, '2018-04-07'");

// Test filter on non-partitioned date column
std::vector<std::string> filters = {"ds = date'2018-04-06'"};
plan = planBuilder->startTableScan()
.outputType(rowType)
.subfieldFilters(filters)
.tableFormat("iceberg")
.endTableScan()
.planNode();
splits.clear();
for (auto dataFilePath : dataFilePaths) {
splits.emplace_back(makeIcebergSplit(dataFilePath->getPath(), {}, {}));
}

HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 0, '2018-04-06'");
}

} // namespace facebook::velox::connector::hive::iceberg
6 changes: 6 additions & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ struct TableParameter {
"serialization.null.format";
};

struct ColumnParameter {
// Describes the format of the date partition value.
static constexpr const char* kPartitionDateValueFormat =
"partition.date.value.format";
};

/// Implicit row number column to be added. This column will be removed in the
/// output of split reader. Should use the ScanSpec::ColumnType::kRowIndex if
/// the column is suppose to be explicit and kept in the output.
Expand Down
21 changes: 18 additions & 3 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ PlanBuilder& PlanBuilder::tableScan(
const RowTypePtr& dataColumns,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& assignments) {
std::shared_ptr<connector::ColumnHandle>>& assignments,
const std::unordered_set<std::string>& partitionKeys) {
return TableScanBuilder(*this)
.outputType(outputType)
.assignments(assignments)
.subfieldFilters(subfieldFilters)
.remainingFilter(remainingFilter)
.dataColumns(dataColumns)
.assignments(assignments)
.partitionKeys(partitionKeys)
.endTableScan();
}

Expand Down Expand Up @@ -170,13 +172,26 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
}

if (!hasAssignments) {
auto columnType = HiveColumnHandle::ColumnType::kRegular;
if (partitionKeys_.count(name) > 0) {
columnType = HiveColumnHandle::ColumnType::kPartitionKey;
}
std::unordered_map<std::string, std::string> columnParameters;
if (tableFormat_ == "iceberg" && type->isDate()) {
columnParameters.insert(
{dwio::common::ColumnParameter::kPartitionDateValueFormat,
"days_since_epoch"});
}
std::vector<common::Subfield> requiredSubFields;
assignments_.insert(
{name,
std::make_shared<HiveColumnHandle>(
hiveColumnName,
HiveColumnHandle::ColumnType::kRegular,
columnType,
type,
type,
type)});
std::move(requiredSubFields),
columnParameters)});
}
}

Expand Down
Loading

0 comments on commit 9e5d6d2

Please sign in to comment.