Skip to content

Commit

Permalink
fix(iceberg): Date partition value parse issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nmahadevuni committed Feb 12, 2025
1 parent f781b74 commit 3804e10
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 46 deletions.
19 changes: 15 additions & 4 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,18 @@ namespace {
bool applyPartitionFilter(
const TypePtr& type,
const std::string& partitionValue,
bool isPartitionDateDaysSinceEpoch,
common::Filter* filter) {
if (type->isDate()) {
const auto result = util::fromDateString(
StringView(partitionValue), util::ParseMode::kPrestoCast);
VELOX_CHECK(!result.hasError());
return applyFilter(*filter, result.value());
int32_t result = 0;
// days_since_epoch partition values are integers in string format. Eg.
// Iceberg partition values.
if (isPartitionDateDaysSinceEpoch) {
result = folly::to<int32_t>(partitionValue);
} else {
result = DATE()->toDays(static_cast<folly::StringPiece>(partitionValue));
}
return applyFilter(*filter, result);
}

switch (type->kind()) {
Expand Down Expand Up @@ -697,10 +703,15 @@ bool testFilters(
const auto handlesIter = partitionKeysHandle.find(name);
VELOX_CHECK(handlesIter != partitionKeysHandle.end());

auto partitionDateValueFormat =
handlesIter->second->getColumnParameterValue(
dwio::common::ColumnParameter::kPartitionDateValueFormat);
// This is a non-null partition key
return applyPartitionFilter(
handlesIter->second->dataType(),
iter->second.value(),
partitionDateValueFormat.has_value() &&
partitionDateValueFormat.value() == "days_since_epoch",
child->filter());
}
// Column is missing, most likely due to schema evolution. Or it's a
Expand Down
18 changes: 15 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ VectorPtr newConstantFromString(
vector_size_t size,
velox::memory::MemoryPool* pool,
const std::string& sessionTimezone,
bool asLocalTime) {
bool asLocalTime,
bool isPartitionDateDaysSinceEpoch = false) {
using T = typename TypeTraits<kind>::NativeType;
if (!value.has_value()) {
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
}

if (type->isDate()) {
auto days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
int32_t days = 0;
// For Iceberg, the date partition values are already in daysSinceEpoch
// form.
if (isPartitionDateDaysSinceEpoch) {
days = folly::to<int32_t>(value.value());
} else {
days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
}
return std::make_shared<ConstantVector<int32_t>>(
pool, size, false, type, std::move(days));
}
Expand Down Expand Up @@ -393,6 +401,8 @@ void SplitReader::setPartitionValue(
"ColumnHandle is missing for partition key {}",
partitionKey);
auto type = it->second->dataType();
auto partitionDataValueFormat = it->second->getColumnParameterValue(
dwio::common::ColumnParameter::kPartitionDateValueFormat);
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
type->kind(),
Expand All @@ -402,7 +412,9 @@ void SplitReader::setPartitionValue(
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone(),
hiveConfig_->readTimestampPartitionValueAsLocalTime(
connectorQueryCtx_->sessionProperties()));
connectorQueryCtx_->sessionProperties()),
partitionDataValueFormat.has_value() &&
partitionDataValueFormat.value() == "days_since_epoch");
spec->setConstantValue(constant);
}

Expand Down
41 changes: 39 additions & 2 deletions velox/connectors/hive/TableHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ folly::dynamic HiveColumnHandle::serialize() const {
requiredSubfields.push_back(subfield.toString());
}
obj["requiredSubfields"] = requiredSubfields;

folly::dynamic columnParameters = folly::dynamic::array;
for (const auto& [key, value] : columnParameters_) {
folly::dynamic pair = folly::dynamic::object;
pair["key"] = key;
pair["value"] = value;
columnParameters.push_back(pair);
}
obj["columnParameters"] = columnParameters;

return obj;
}

Expand All @@ -77,7 +87,22 @@ std::string HiveColumnHandle::toString() const {
for (const auto& subfield : requiredSubfields_) {
out << " " << subfield.toString();
}
out << " ]]";
out << " ]";

if (!columnParameters_.empty()) {
out << ", columnParameters: [";
bool printSeparator = false;
for (const auto& [key, value] : columnParameters_) {
if (printSeparator) {
out << ", ";
}
out << "(" << key << ", " << value << ")";
printSeparator = true;
}
out << "]";
}
out << "]";

return out.str();
}

Expand All @@ -94,8 +119,20 @@ ColumnHandlePtr HiveColumnHandle::create(const folly::dynamic& obj) {
requiredSubfields.emplace_back(s.asString());
}

std::unordered_map<std::string, std::string> columnParameters;
folly::dynamic columnParametersObj = obj["columnParameters"];
for (const auto& columnParameter : columnParametersObj) {
columnParameters[columnParameter["key"].asString()] =
columnParameter["value"].asString();
}

return std::make_shared<HiveColumnHandle>(
name, columnType, dataType, hiveType, std::move(requiredSubfields));
name,
columnType,
dataType,
hiveType,
std::move(requiredSubfields),
columnParameters);
}

void HiveColumnHandle::registerSerDe() {
Expand Down
21 changes: 19 additions & 2 deletions velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ class HiveColumnHandle : public ColumnHandle {
ColumnType columnType,
TypePtr dataType,
TypePtr hiveType,
std::vector<common::Subfield> requiredSubfields = {})
std::vector<common::Subfield> requiredSubfields = {},
const std::unordered_map<std::string, std::string>& columnParameters = {})
: name_(name),
columnType_(columnType),
dataType_(std::move(dataType)),
hiveType_(std::move(hiveType)),
requiredSubfields_(std::move(requiredSubfields)) {
requiredSubfields_(std::move(requiredSubfields)),
columnParameters_(columnParameters) {
VELOX_USER_CHECK(
dataType_->equivalent(*hiveType_),
"data type {} and hive type {} do not match",
Expand Down Expand Up @@ -96,6 +98,18 @@ class HiveColumnHandle : public ColumnHandle {
return columnType_ == ColumnType::kPartitionKey;
}

const std::unordered_map<std::string, std::string>& columnParameters() const {
return columnParameters_;
}

const std::optional<std::string> getColumnParameterValue(
const std::string& columnParameter) {
if (columnParameters_.count(columnParameter) > 0) {
return columnParameters_.at(columnParameter);
}
return {};
}

std::string toString() const;

folly::dynamic serialize() const override;
Expand All @@ -115,6 +129,9 @@ class HiveColumnHandle : public ColumnHandle {
const TypePtr dataType_;
const TypePtr hiveType_;
const std::vector<common::Subfield> requiredSubfields_;
// The column parameters are used for metadata like Iceberg date partition
// value format.
const std::unordered_map<std::string, std::string> columnParameters_;
};

class HiveTableHandle : public ConnectorTableHandle {
Expand Down
163 changes: 136 additions & 27 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
Expand Down Expand Up @@ -225,8 +226,71 @@ class HiveIcebergTest : public HiveConnectorTestBase {
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
}

void assertQuery(
RowTypePtr rowType,
const std::vector<RowVectorPtr>& dataVectors,
const std::string duckDbSql,
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {},
const std::vector<std::string> filters = {}) {
VELOX_CHECK(!duckDbSql.empty(), "DuckDb sql is empty");
auto dataFilePath = TempFilePath::create();

writeToFile(
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
std::vector<std::shared_ptr<ConnectorSplit>> splits;
splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));

std::unordered_set<std::string> partitionColumns;

for (auto partitionKey : partitionKeys) {
partitionColumns.insert(partitionKey.first);
}

auto planBuilder = new PlanBuilder(pool_.get());
auto plan = PlanBuilder::TableScanBuilder(*planBuilder)
.outputType(rowType)
.subfieldFilters(filters)
.partitionKeys(partitionColumns)
.tableFormat("iceberg")
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(plan, splits, duckDbSql);
}

const static int rowCount = 20000;

protected:
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {},
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {}) {
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

auto file = filesystems::getFileSystem(dataFilePath, nullptr)
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
/*cacheable=*/true,
deleteFiles);
}

private:
std::map<std::string, std::shared_ptr<TempFilePath>> writeDataFiles(
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles) {
Expand Down Expand Up @@ -335,31 +399,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return vectors;
}

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

auto file = filesystems::getFileSystem(dataFilePath, nullptr)
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
/*cacheable=*/true,
deleteFiles);
}

std::string getDuckDBQuery(
const std::map<std::string, std::vector<int64_t>>& rowGroupSizesForFiles,
const std::unordered_map<
Expand Down Expand Up @@ -478,8 +517,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
}

dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF};
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})};
std::shared_ptr<IcebergMetadataColumn> pathColumn_ =
Expand Down Expand Up @@ -660,4 +697,76 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
assertMultipleSplits({}, 10, 3);
}

TEST_F(HiveIcebergTest, testPartitionedRead) {
RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
// Iceberg API sets partition values for dates to daysSinceEpoch, so
// in velox, we do not need to convert it to days.
// Test query on two partitions ds=17627(2018-04-06), ds=17628(2018-04-07)
std::vector<std::shared_ptr<ConnectorSplit>> splits;
std::vector<std::shared_ptr<TempFilePath>> dataFilePaths;
int32_t daysSinceEpoch;
for (int i = 0; i <= 1; ++i) {
std::vector<RowVectorPtr> dataVectors;
daysSinceEpoch = 17627 + i;
VectorPtr c0 = makeFlatVector<int64_t>((std::vector<int64_t>){i});
VectorPtr ds =
makeFlatVector<int32_t>((std::vector<int32_t>){daysSinceEpoch});
dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds}));

auto dataFilePath = TempFilePath::create();
dataFilePaths.push_back(dataFilePath);
writeToFile(
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
partitionKeys["ds"] = std::to_string(daysSinceEpoch);
splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));
}

std::unordered_set<std::string> partitionColumns;

for (auto partitionKey : partitionKeys) {
partitionColumns.insert(partitionKey.first);
}

auto planBuilder = new PlanBuilder(pool_.get());
auto plan = planBuilder->startTableScan()
.outputType(rowType)
.partitionKeys(partitionColumns)
.tableFormat("iceberg")
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(
plan,
splits,
"SELECT * FROM (VALUES (0, '2018-04-06'), (1, '2018-04-07'))",
0);

// Filter on non-partitioned non-date column
std::vector<std::string> nonPartitionFilters = {"c0 = 1"};
plan = planBuilder->startTableScan()
.outputType(rowType)
.partitionKeys(partitionColumns)
.subfieldFilters(nonPartitionFilters)
.tableFormat("iceberg")
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 1, '2018-04-07'");

// Test filter on non-partitioned date column
std::vector<std::string> filters = {"ds = date'2018-04-06'"};
plan = planBuilder->startTableScan()
.outputType(rowType)
.subfieldFilters(filters)
.tableFormat("iceberg")
.endTableScan()
.planNode();
splits.clear();
for (auto dataFilePath : dataFilePaths) {
splits.emplace_back(makeIcebergSplit(dataFilePath->getPath(), {}, {}));
}

HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 0, '2018-04-06'");
}

} // namespace facebook::velox::connector::hive::iceberg
Loading

0 comments on commit 3804e10

Please sign in to comment.