From 3c1b9a30a35e5bdbc4c8b1b315a544e491d768b8 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Sat, 28 Dec 2024 23:02:27 -0800 Subject: [PATCH] Pass cacheable flag in connector split (#24303) Summary: Pull Request resolved: https://github.com/prestodb/presto/pull/24303 Differential Revision: D67688385 --- .../presto_cpp/main/SystemConnector.cpp | 8 +++++--- .../presto_cpp/main/SystemConnector.h | 3 ++- .../presto_cpp/main/SystemSplit.h | 5 +++-- .../main/types/PrestoToVeloxConnector.cpp | 16 ++++++++++++---- .../main/types/PrestoToVeloxConnector.h | 12 ++++++++---- .../presto_cpp/main/types/PrestoToVeloxSplit.cpp | 4 +++- presto-native-execution/velox | 2 +- 7 files changed, 34 insertions(+), 16 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/SystemConnector.cpp b/presto-native-execution/presto_cpp/main/SystemConnector.cpp index 9de215e014cac..dc8790bf70b38 100644 --- a/presto-native-execution/presto_cpp/main/SystemConnector.cpp +++ b/presto-native-execution/presto_cpp/main/SystemConnector.cpp @@ -350,14 +350,16 @@ std::optional SystemDataSource::next( std::unique_ptr SystemPrestoToVeloxConnector::toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* const connectorSplit) const { + const protocol::ConnectorSplit* const connectorSplit, + const protocol::SplitContext* splitContext) const { auto systemSplit = dynamic_cast(connectorSplit); VELOX_CHECK_NOT_NULL( systemSplit, "Unexpected split type {}", connectorSplit->_type); return std::make_unique( catalogId, systemSplit->tableHandle.schemaName, - systemSplit->tableHandle.tableName); + systemSplit->tableHandle.tableName, + splitContext->cacheable); } std::unique_ptr @@ -393,4 +395,4 @@ std::unique_ptr SystemPrestoToVeloxConnector::createConnectorProtocol() const { return std::make_unique(); } -} // namespace facebook::presto \ No newline at end of file +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/SystemConnector.h b/presto-native-execution/presto_cpp/main/SystemConnector.h index b467cf25676b6..52d9df595f736 100644 --- a/presto-native-execution/presto_cpp/main/SystemConnector.h +++ b/presto-native-execution/presto_cpp/main/SystemConnector.h @@ -184,7 +184,8 @@ class SystemPrestoToVeloxConnector final : public PrestoToVeloxConnector { std::unique_ptr toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit) const final; + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const final; std::unique_ptr toVeloxColumnHandle( const protocol::ColumnHandle* column, diff --git a/presto-native-execution/presto_cpp/main/SystemSplit.h b/presto-native-execution/presto_cpp/main/SystemSplit.h index f1af48d6dfc4f..fab7a00422704 100644 --- a/presto-native-execution/presto_cpp/main/SystemSplit.h +++ b/presto-native-execution/presto_cpp/main/SystemSplit.h @@ -21,8 +21,9 @@ struct SystemSplit : public velox::connector::ConnectorSplit { explicit SystemSplit( const std::string& connectorId, const std::string& schemaName, - const std::string& tableName) - : ConnectorSplit(connectorId), + const std::string& tableName, + bool cacheable) + : ConnectorSplit(connectorId, /*splitWeight=*/0, cacheable), schemaName_(schemaName), tableName_(tableName) {} diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp index 45ceb97af58a4..2f69374d4a696 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp @@ -1097,7 +1097,8 @@ velox::connector::hive::iceberg::FileContent toVeloxFileContent( std::unique_ptr HivePrestoToVeloxConnector::toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* const connectorSplit) const { + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const { auto hiveSplit = dynamic_cast(connectorSplit); VELOX_CHECK_NOT_NULL( @@ -1147,6 +1148,7 @@ HivePrestoToVeloxConnector::toVeloxSplit( extraFileInfo, serdeParameters, hiveSplit->splitWeight, + splitContext->cacheable, infoColumns); if (hiveSplit->bucketConversion) { VELOX_CHECK_NOT_NULL(hiveSplit->tableBucketNumber); @@ -1331,7 +1333,8 @@ HivePrestoToVeloxConnector::createConnectorProtocol() const { std::unique_ptr IcebergPrestoToVeloxConnector::toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* const connectorSplit) const { + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const { auto icebergSplit = dynamic_cast(connectorSplit); VELOX_CHECK_NOT_NULL( @@ -1386,6 +1389,7 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( std::nullopt, customSplitInfo, nullptr, + splitContext->cacheable, deletes, infoColumns); } @@ -1482,13 +1486,17 @@ IcebergPrestoToVeloxConnector::createConnectorProtocol() const { std::unique_ptr TpchPrestoToVeloxConnector::toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* const connectorSplit) const { + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const { auto tpchSplit = dynamic_cast(connectorSplit); VELOX_CHECK_NOT_NULL( tpchSplit, "Unexpected split type {}", connectorSplit->_type); return std::make_unique( - catalogId, tpchSplit->totalParts, tpchSplit->partNumber); + catalogId, + splitContext->cacheable, + tpchSplit->totalParts, + tpchSplit->partNumber); } std::unique_ptr diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h index 6d80751778e6a..eb33dfb54ca1d 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h @@ -45,7 +45,8 @@ class PrestoToVeloxConnector { [[nodiscard]] virtual std::unique_ptr toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit) const = 0; + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const = 0; [[nodiscard]] virtual std::unique_ptr toVeloxColumnHandle( @@ -115,7 +116,8 @@ class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector { std::unique_ptr toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit) const final; + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const final; std::unique_ptr toVeloxColumnHandle( const protocol::ColumnHandle* column, @@ -166,7 +168,8 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector { std::unique_ptr toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit) const final; + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const final; std::unique_ptr toVeloxColumnHandle( const protocol::ColumnHandle* column, @@ -192,7 +195,8 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector { std::unique_ptr toVeloxSplit( const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit) const final; + const protocol::ConnectorSplit* connectorSplit, + const protocol::SplitContext* splitContext) const final; std::unique_ptr toVeloxColumnHandle( const protocol::ColumnHandle* column, diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxSplit.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxSplit.cpp index 47522c7842098..1d11be2e904fc 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxSplit.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxSplit.cpp @@ -40,7 +40,9 @@ velox::exec::Split toVeloxSplit( auto& connector = getPrestoToVeloxConnector(connectorSplit->_type); auto veloxSplit = connector.toVeloxSplit( - scheduledSplit.split.connectorId, connectorSplit.get()); + scheduledSplit.split.connectorId, + connectorSplit.get(), + &scheduledSplit.split.splitContext); return velox::exec::Split(std::move(veloxSplit), splitGroupId); } diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 87c558e094043..50d4d64c51c37 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 87c558e094043c5451659c59d38ffc1bd6de781a +Subproject commit 50d4d64c51c370c2e0276da4ab0369ecbcb4797f