Skip to content

Commit

Permalink
[native] Support converting Presto Iceberg Plan to Velox Plan
Browse files Browse the repository at this point in the history
Use Presto IcebergColumnHandle to create Velox HiveColumnHandle
Use Presto IcebergTableLayoutHandle/IcebergTableHandle to create Velox HiveTableHandle
  • Loading branch information
imjalpreet authored and yingsu00 committed Feb 19, 2024
1 parent 7d7df5c commit 3c7e01e
Showing 1 changed file with 127 additions and 52 deletions.
179 changes: 127 additions & 52 deletions presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ std::shared_ptr<connector::ColumnHandle> toColumnHandle(
toRequiredSubfields(hiveColumn->requiredSubfields));
}

if (auto icebergColumn =
dynamic_cast<const protocol::IcebergColumnHandle*>(column)) {
// TODO(imjalpreet): Modify 'hiveType' argument of the 'HiveColumnHandle'
// constructor similar to how Hive Connector is handling for bucketing
return std::make_shared<connector::hive::HiveColumnHandle>(
icebergColumn->columnIdentity.name,
toHiveColumnType(icebergColumn->columnType),
stringToType(icebergColumn->type, typeParser),
stringToType(icebergColumn->type, typeParser),
toRequiredSubfields(icebergColumn->requiredSubfields));
}

if (auto tpchColumn =
dynamic_cast<const protocol::TpchColumnHandle*>(column)) {
return std::make_shared<connector::tpch::TpchColumnHandle>(
Expand Down Expand Up @@ -856,6 +868,81 @@ TypePtr fieldNamesToLowerCase<TypeKind::ROW>(const TypePtr& type) {
return std::make_shared<RowType>(std::move(names), std::move(types));
}

std::shared_ptr<connector::ConnectorTableHandle> toHiveTableHandle(
const protocol::TupleDomain<protocol::Subfield>& domainPredicate,
const std::shared_ptr<protocol::RowExpression>& remainingPredicate,
bool isPushdownFilterEnabled,
const std::string& tableName,
const protocol::List<protocol::Column>& dataColumns,
const protocol::TableHandle& tableHandle,
const protocol::Map<protocol::String, protocol::String>& tableParameters,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser) {
connector::hive::SubfieldFilters subfieldFilters;
auto domains = domainPredicate.domains;
for (const auto& domain : *domains) {
auto filter = domain.second;
subfieldFilters[common::Subfield(domain.first)] =
toFilter(domain.second, exprConverter, typeParser);
}

auto remainingFilter = exprConverter.toVeloxExpr(remainingPredicate);
if (auto constant = std::dynamic_pointer_cast<const core::ConstantTypedExpr>(
remainingFilter)) {
bool value = constant->value().value<bool>();
VELOX_CHECK(value, "Unexpected always-false remaining predicate");

// Use null for always-true filter.
remainingFilter = nullptr;
}

RowTypePtr finalDataColumns;
if (!dataColumns.empty()) {
std::vector<std::string> names;
std::vector<TypePtr> types;
velox::type::fbhive::HiveTypeParser typeParser;
names.reserve(dataColumns.size());
types.reserve(dataColumns.size());
for (auto& column : dataColumns) {
std::string name = column.name;
folly::toLowerAscii(name);
names.emplace_back(std::move(name));
auto parsedType = typeParser.parse(column.type);
// The type from the metastore may have upper case letters
// in field names, convert them all to lower case to be
// compatible with Presto.
types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH(
fieldNamesToLowerCase, parsedType->kind(), parsedType));
}
finalDataColumns = ROW(std::move(names), std::move(types));
}

if (tableParameters.empty()) {
return std::make_shared<connector::hive::HiveTableHandle>(
tableHandle.connectorId,
tableName,
isPushdownFilterEnabled,
std::move(subfieldFilters),
remainingFilter,
finalDataColumns);
}

std::unordered_map<std::string, std::string> finalTableParameters = {};
finalTableParameters.reserve(tableParameters.size());
for (const auto& [key, value] : tableParameters) {
finalTableParameters[key] = value;
}

return std::make_shared<connector::hive::HiveTableHandle>(
tableHandle.connectorId,
tableName,
isPushdownFilterEnabled,
std::move(subfieldFilters),
remainingFilter,
finalDataColumns,
finalTableParameters);
}

std::shared_ptr<connector::ConnectorTableHandle> toConnectorTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
Expand All @@ -869,47 +956,6 @@ std::shared_ptr<connector::ConnectorTableHandle> toConnectorTableHandle(
partitionColumns.emplace(entry.name, toColumnHandle(&entry, typeParser));
}

connector::hive::SubfieldFilters subfieldFilters;
auto domains = hiveLayout->domainPredicate.domains;
for (const auto& domain : *domains) {
auto filter = domain.second;
subfieldFilters[common::Subfield(domain.first)] =
toFilter(domain.second, exprConverter, typeParser);
}

auto remainingFilter =
exprConverter.toVeloxExpr(hiveLayout->remainingPredicate);
if (auto constant =
std::dynamic_pointer_cast<const core::ConstantTypedExpr>(
remainingFilter)) {
bool value = constant->value().value<bool>();
VELOX_CHECK(value, "Unexpected always-false remaining predicate");

// Use null for always-true filter.
remainingFilter = nullptr;
}

RowTypePtr dataColumns;
if (!hiveLayout->dataColumns.empty()) {
std::vector<std::string> names;
std::vector<TypePtr> types;
velox::type::fbhive::HiveTypeParser typeParser;
names.reserve(hiveLayout->dataColumns.size());
types.reserve(hiveLayout->dataColumns.size());
for (auto& column : hiveLayout->dataColumns) {
std::string name = column.name;
folly::toLowerAscii(name);
names.emplace_back(std::move(name));
auto parsedType = typeParser.parse(column.type);
// The type from the metastore may have upper case letters
// in field names, convert them all to lower case to be
// compatible with Presto.
types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH(
fieldNamesToLowerCase, parsedType->kind(), parsedType));
}
dataColumns = ROW(std::move(names), std::move(types));
}

auto hiveTableHandle =
std::dynamic_pointer_cast<const protocol::HiveTableHandle>(
tableHandle.connectorHandle);
Expand All @@ -921,20 +967,49 @@ std::shared_ptr<connector::ConnectorTableHandle> toConnectorTableHandle(
: fmt::format(
"{}.{}", hiveTableHandle->schemaName, hiveTableHandle->tableName);

std::unordered_map<std::string, std::string> tableParameters;
tableParameters.reserve(hiveLayout->tableParameters.size());
for (const auto& [key, value] : hiveLayout->tableParameters) {
tableParameters[key] = value;
return toHiveTableHandle(
hiveLayout->domainPredicate,
hiveLayout->remainingPredicate,
hiveLayout->pushdownFilterEnabled,
tableName,
hiveLayout->dataColumns,
tableHandle,
hiveLayout->tableParameters,
exprConverter,
typeParser);
}

if (auto icebergLayout =
std::dynamic_pointer_cast<const protocol::IcebergTableLayoutHandle>(
tableHandle.connectorTableLayout)) {
for (const auto& entry : icebergLayout->partitionColumns) {
partitionColumns.emplace(
entry.columnIdentity.name, toColumnHandle(&entry, typeParser));
}

return std::make_shared<connector::hive::HiveTableHandle>(
tableHandle.connectorId,
auto icebergTableHandle =
std::dynamic_pointer_cast<const protocol::IcebergTableHandle>(
tableHandle.connectorHandle);
VELOX_CHECK_NOT_NULL(icebergTableHandle);

// Use fully qualified name if available.
std::string tableName = icebergTableHandle->schemaName.empty()
? icebergTableHandle->icebergTableName.tableName
: fmt::format(
"{}.{}",
icebergTableHandle->schemaName,
icebergTableHandle->icebergTableName.tableName);

return toHiveTableHandle(
icebergLayout->domainPredicate,
icebergLayout->remainingPredicate,
icebergLayout->pushdownFilterEnabled,
tableName,
hiveLayout->pushdownFilterEnabled,
std::move(subfieldFilters),
remainingFilter,
dataColumns,
tableParameters);
icebergLayout->dataColumns,
tableHandle,
{},
exprConverter,
typeParser);
}

if (auto tpchLayout =
Expand Down

0 comments on commit 3c7e01e

Please sign in to comment.