Skip to content

Commit

Permalink
fix(iceberg): Date partition value parse issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nmahadevuni committed Jan 27, 2025
1 parent 54c6d9e commit 67b025c
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 23 deletions.
18 changes: 13 additions & 5 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,14 +632,20 @@ void configureRowReaderOptions(
namespace {

bool applyPartitionFilter(
SplitReader::TableFormat tableFormat,
const TypePtr& type,
const std::string& partitionValue,
common::Filter* filter) {
if (type->isDate()) {
const auto result = util::fromDateString(
StringView(partitionValue), util::ParseMode::kPrestoCast);
VELOX_CHECK(!result.hasError());
return applyFilter(*filter, result.value());
int32_t result = 0;
// Iceberg partition values are already in daysSinceEpoch, no need to
// convert.
if (tableFormat == SplitReader::TableFormat::kIceberg) {
result = std::stoi(partitionValue);
} else {
result = DATE()->toDays(static_cast<folly::StringPiece>(partitionValue));
}
return applyFilter(*filter, result);
}

switch (type->kind()) {
Expand Down Expand Up @@ -681,7 +687,8 @@ bool testFilters(
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle) {
partitionKeysHandle,
SplitReader::TableFormat tableFormat) {
const auto totalRows = reader->numberOfRows();
const auto& fileTypeWithId = reader->typeWithId();
const auto& rowType = reader->rowType();
Expand All @@ -699,6 +706,7 @@ bool testFilters(

// This is a non-null partition key
return applyPartitionFilter(
tableFormat,
handlesIter->second->dataType(),
iter->second.value(),
child->filter());
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/connectors/hive/SplitReader.h"
#include "velox/dwio/common/BufferedInput.h"
#include "velox/dwio/common/Reader.h"

Expand Down Expand Up @@ -95,7 +96,8 @@ bool testFilters(
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKey,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle);
partitionKeysHandle,
SplitReader::TableFormat tableFormat = SplitReader::TableFormat::kHive);

std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
const FileHandle& fileHandle,
Expand Down
29 changes: 22 additions & 7 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ VectorPtr newConstantFromString(
const std::optional<std::string>& value,
vector_size_t size,
velox::memory::MemoryPool* pool,
const std::string& sessionTimezone) {
const std::string& sessionTimezone,
const std::optional<SplitReader::TableFormat>& tableFormat) {
using T = typename TypeTraits<kind>::NativeType;
if (!value.has_value()) {
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
}

if (type->isDate()) {
auto days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
int32_t days = 0;
// For Iceberg, the date partition values are already in daysSinceEpoch
// form.
if (tableFormat.has_value() &&
tableFormat.value() == SplitReader::TableFormat::kIceberg) {
days = std::stoi(value.value());
} else {
days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
}
return std::make_shared<ConstantVector<int32_t>>(
pool, size, false, type, std::move(days));
}
Expand Down Expand Up @@ -101,7 +110,8 @@ std::unique_ptr<SplitReader> SplitReader::create(
ioStats,
fileHandleFactory,
executor,
scanSpec));
scanSpec,
TableFormat::kHive));
}
}

Expand All @@ -116,12 +126,14 @@ SplitReader::SplitReader(
const std::shared_ptr<io::IoStatistics>& ioStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec)
const std::shared_ptr<common::ScanSpec>& scanSpec,
TableFormat tableFormat)
: hiveSplit_(hiveSplit),
hiveTableHandle_(hiveTableHandle),
partitionKeys_(partitionKeys),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
tableFormat_(tableFormat),
readerOutputType_(readerOutputType),
ioStats_(ioStats),
fileHandleFactory_(fileHandleFactory),
Expand Down Expand Up @@ -272,7 +284,8 @@ bool SplitReader::filterOnStats(
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
*partitionKeys_)) {
*partitionKeys_,
tableFormat_)) {
return true;
}
++runtimeStats.skippedSplits;
Expand Down Expand Up @@ -335,7 +348,8 @@ std::vector<TypePtr> SplitReader::adaptColumns(
iter->second,
1,
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
connectorQueryCtx_->sessionTimezone(),
{});
childSpec->setConstantValue(constant);
} else if (
childSpec->columnType() == common::ScanSpec::ColumnType::kRegular) {
Expand Down Expand Up @@ -389,7 +403,8 @@ void SplitReader::setPartitionValue(
value,
1,
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
connectorQueryCtx_->sessionTimezone(),
tableFormat_);
spec->setConstantValue(constant);
}

Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class HiveConfig;

class SplitReader {
public:
enum class TableFormat { kHive, kIceberg };

static std::unique_ptr<SplitReader> create(
const std::shared_ptr<hive::HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveTableHandle>& hiveTableHandle,
Expand Down Expand Up @@ -109,7 +111,8 @@ class SplitReader {
const std::shared_ptr<io::IoStatistics>& ioStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec);
const std::shared_ptr<common::ScanSpec>& scanSpec,
TableFormat tableFormat);

/// Create the dwio::common::Reader object baseReader_, which will be used to
/// read the data file's metadata and schema
Expand Down Expand Up @@ -154,6 +157,7 @@ class SplitReader {
std::shared_ptr<HiveColumnHandle>>* const partitionKeys_;
const ConnectorQueryCtx* connectorQueryCtx_;
const std::shared_ptr<const HiveConfig> hiveConfig_;
TableFormat tableFormat_;

const RowTypePtr readerOutputType_;
const std::shared_ptr<io::IoStatistics> ioStats_;
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ IcebergSplitReader::IcebergSplitReader(
ioStats,
fileHandleFactory,
executor,
scanSpec),
scanSpec,
SplitReader::TableFormat::kIceberg),
baseReadOffset_(0),
splitOffset_(0),
deleteBitmap_(nullptr),
Expand Down
56 changes: 54 additions & 2 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
Expand Down Expand Up @@ -225,6 +226,35 @@ class HiveIcebergTest : public HiveConnectorTestBase {
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
}

void assertQuery(
RowTypePtr rowType,
const std::vector<RowVectorPtr>& dataVectors,
const std::string duckDbSql,
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {},
const std::vector<std::string> filters = {}) {
VELOX_CHECK(!duckDbSql.empty(), "DuckDb sql is empty");
auto dataFilePath = TempFilePath::create();

writeToFile(
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
std::vector<std::shared_ptr<ConnectorSplit>> splits;
splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));

std::unordered_set<std::string> partitionColumns;

for (auto partitionKey : partitionKeys) {
partitionColumns.insert(partitionKey.first);
}

auto plan =
PlanBuilder(pool_.get())
.tableScan(rowType, filters, "", nullptr, {}, partitionColumns)
.planNode();
HiveConnectorTestBase::assertQuery(plan, splits, duckDbSql);
}

const static int rowCount = 20000;

private:
Expand Down Expand Up @@ -337,8 +367,9 @@ class HiveIcebergTest : public HiveConnectorTestBase {

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
const std::vector<IcebergDeleteFile>& deleteFiles = {},
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {}) {
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

Expand Down Expand Up @@ -660,4 +691,25 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
assertMultipleSplits({}, 10, 3);
}

TEST_F(HiveIcebergTest, testPartitionedRead) {
RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
// Iceberg API sets partition values for dates to daysSinceEpoch, so
// in velox, we do not need to convert it to days.
// Date = 2018-04-06, daysSinceEpoch = 17627
partitionKeys["ds"] = "17627";

std::vector<RowVectorPtr> dataVectors;
VectorPtr c0 = makeFlatVector<int64_t>((std::vector<int64_t>){1});
VectorPtr ds = makeFlatVector<int32_t>((std::vector<int32_t>){17627});
dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds}));

assertQuery(
rowType, dataVectors, "SELECT 1, '2018-04-06'", partitionKeys, {});

std::vector<std::string> filters = {"ds = date'2018-04-06'"};
assertQuery(
rowType, dataVectors, "SELECT 1, '2018-04-06'", partitionKeys, filters);
}

} // namespace facebook::velox::connector::hive::iceberg
13 changes: 8 additions & 5 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ PlanBuilder& PlanBuilder::tableScan(
const RowTypePtr& dataColumns,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& assignments) {
std::shared_ptr<connector::ColumnHandle>>& assignments,
const std::unordered_set<std::string>& partitionKeys) {
return TableScanBuilder(*this)
.outputType(outputType)
.assignments(assignments)
.subfieldFilters(subfieldFilters)
.remainingFilter(remainingFilter)
.dataColumns(dataColumns)
.assignments(assignments)
.partitionKeys(partitionKeys)
.endTableScan();
}

Expand Down Expand Up @@ -170,13 +172,14 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
}

if (!hasAssignments) {
auto columnType = HiveColumnHandle::ColumnType::kRegular;
if (partitionKeys_.count(name) > 0) {
columnType = HiveColumnHandle::ColumnType::kPartitionKey;
}
assignments_.insert(
{name,
std::make_shared<HiveColumnHandle>(
hiveColumnName,
HiveColumnHandle::ColumnType::kRegular,
type,
type)});
hiveColumnName, columnType, type, type)});
}
}

Expand Down
11 changes: 10 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,16 @@ class PlanBuilder {
/// you define the output types only. See 'missingColumns' test in
/// 'TableScanTest'.
/// @param assignments Optional ColumnHandles.
/// @param partitionKeys Optional partition keys.
PlanBuilder& tableScan(
const RowTypePtr& outputType,
const std::vector<std::string>& subfieldFilters = {},
const std::string& remainingFilter = "",
const RowTypePtr& dataColumns = nullptr,
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& assignments = {});
std::shared_ptr<connector::ColumnHandle>>& assignments = {},
const std::unordered_set<std::string>& partitionKeys = {});

/// Add a TableScanNode to scan a Hive table.
///
Expand Down Expand Up @@ -277,6 +279,12 @@ class PlanBuilder {
return *this;
}

TableScanBuilder& partitionKeys(
std::unordered_set<std::string> partitionKeys) {
partitionKeys_ = std::move(partitionKeys);
return *this;
}

/// Stop the TableScanBuilder.
PlanBuilder& endTableScan() {
planBuilder_.planNode_ = build(planBuilder_.nextPlanNodeId());
Expand All @@ -298,6 +306,7 @@ class PlanBuilder {
std::shared_ptr<connector::ConnectorTableHandle> tableHandle_;
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignments_;
std::unordered_set<std::string> partitionKeys_;
};

/// Start a TableScanBuilder.
Expand Down

0 comments on commit 67b025c

Please sign in to comment.