From 68b26f38fca1ed250e22011c005e1d5b7605c04d Mon Sep 17 00:00:00 2001 From: Christian Zentgraf Date: Wed, 27 Nov 2024 11:42:19 -0500 Subject: [PATCH] [native] Use Velox namespace explicitly in PrestoServer.cpp The namespace used for most Velox APIs was inconsistent in the PrestoServer.cpp file. This PR makes Velox API usage consistent. This also can avoid issues with ambiguous names if somehow another API introduces a similar naming convention but in a different namespace that could be used by Pretissimo. --- .../presto_cpp/main/PrestoServer.cpp | 114 +++++++++--------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index fbdbcb23564aa..3cedadc480f2b 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -77,7 +77,7 @@ #include #endif -using namespace facebook::velox; +using namespace facebook; namespace facebook::presto { namespace { @@ -105,7 +105,7 @@ void enableChecksum() { velox::exec::OutputBufferManager::getInstance().lock()->setListenerFactory( []() { return std::make_unique< - serializer::presto::PrestoOutputStreamListener>(); + velox::serializer::presto::PrestoOutputStreamListener>(); }); } @@ -238,7 +238,7 @@ void PrestoServer::run() { address_ = fmt::format("[{}]", address_); } nodeLocation_ = nodeConfig->nodeLocation(); - } catch (const VeloxUserError& e) { + } catch (const velox::VeloxUserError& e) { PRESTO_STARTUP_LOG(ERROR) << "Failed to start server due to " << e.what(); exit(EXIT_FAILURE); } @@ -253,8 +253,9 @@ void PrestoServer::run() { // Register Velox connector factory for iceberg. // The iceberg catalog is handled by the hive connector factory. - connector::registerConnectorFactory( - std::make_shared("iceberg")); + velox::connector::registerConnectorFactory( + std::make_shared( + "iceberg")); registerPrestoToVeloxConnector( std::make_unique("hive")); @@ -458,7 +459,7 @@ void PrestoServer::run() { const std::string& taskId, int destination, std::shared_ptr queue, - memory::MemoryPool* pool) { + velox::memory::MemoryPool* pool) { return PrestoExchangeSource::create( taskId, destination, @@ -470,11 +471,11 @@ void PrestoServer::run() { sslContext_); }); - facebook::velox::exec::ExchangeSource::registerFactory( + velox::exec::ExchangeSource::registerFactory( operators::UnsafeRowExchangeSource::createExchangeSource); // Batch broadcast exchange source. - facebook::velox::exec::ExchangeSource::registerFactory( + velox::exec::ExchangeSource::registerFactory( operators::BroadcastExchangeSource::createExchangeSource); pool_ = @@ -520,13 +521,13 @@ void PrestoServer::run() { if (systemConfig->enableVeloxTaskLogging()) { if (auto listener = getTaskListener()) { - exec::registerTaskListener(listener); + velox::exec::registerTaskListener(listener); } } if (systemConfig->enableVeloxExprSetLogging()) { if (auto listener = getExprSetListener()) { - exec::registerExprSetListener(listener); + velox::exec::registerExprSetListener(listener); } } prestoServerOperations_ = @@ -567,7 +568,7 @@ void PrestoServer::run() { PRESTO_STARTUP_LOG(INFO) << "Starting all periodic tasks"; auto* memoryAllocator = velox::memory::memoryManager()->allocator(); - auto* asyncDataCache = cache::AsyncDataCache::getInstance(); + auto* asyncDataCache = velox::cache::AsyncDataCache::getInstance(); periodicTaskManager_ = std::make_unique( driverExecutor_.get(), spillerExecutor_.get(), @@ -782,7 +783,7 @@ void PrestoServer::initializeThreadPools() { } } -std::unique_ptr PrestoServer::setupSsdCache() { +std::unique_ptr PrestoServer::setupSsdCache() { VELOX_CHECK_NULL(cacheExecutor_); auto* systemConfig = SystemConfig::instance(); if (systemConfig->asyncCacheSsdGb() == 0) { @@ -791,7 +792,7 @@ std::unique_ptr PrestoServer::setupSsdCache() { constexpr int32_t kNumSsdShards = 16; cacheExecutor_ = std::make_unique(kNumSsdShards); - cache::SsdCache::Config cacheConfig( + velox::cache::SsdCache::Config cacheConfig( systemConfig->asyncCacheSsdPath(), systemConfig->asyncCacheSsdGb() << 30, kNumSsdShards, @@ -802,7 +803,7 @@ std::unique_ptr PrestoServer::setupSsdCache() { systemConfig->ssdCacheReadVerificationEnabled()); PRESTO_STARTUP_LOG(INFO) << "Initializing SSD cache with " << cacheConfig.toString(); - return std::make_unique(cacheConfig); + return std::make_unique(cacheConfig); } void PrestoServer::initializeVeloxMemory() { @@ -811,7 +812,7 @@ void PrestoServer::initializeVeloxMemory() { PRESTO_STARTUP_LOG(INFO) << "Starting with node memory " << memoryGb << "GB"; // Set up velox memory manager. - memory::MemoryManagerOptions options; + velox::memory::MemoryManagerOptions options; options.allocatorCapacity = memoryGb << 30; if (systemConfig->useMmapAllocator()) { options.useMmapAllocator = true; @@ -865,34 +866,38 @@ void PrestoServer::initializeVeloxMemory() { {std::string(SharedArbitratorConfig::kCheckUsageLeak), folly::to(systemConfig->enableMemoryLeakCheck())}}; } - memory::initializeMemoryManager(options); + velox::memory::initializeMemoryManager(options); PRESTO_STARTUP_LOG(INFO) << "Memory manager has been setup: " - << memory::memoryManager()->toString(); + << velox::memory::memoryManager()->toString(); if (systemConfig->asyncDataCacheEnabled()) { - std::unique_ptr ssd = setupSsdCache(); + std::unique_ptr ssd = setupSsdCache(); std::string cacheStr = ssd == nullptr ? "AsyncDataCache" : "AsyncDataCache with SSD"; - cache::AsyncDataCache::Options cacheOptions{ + velox::cache::AsyncDataCache::Options cacheOptions{ systemConfig->asyncCacheMaxSsdWriteRatio(), systemConfig->asyncCacheSsdSavableRatio(), systemConfig->asyncCacheMinSsdSavableBytes()}; - cache_ = cache::AsyncDataCache::create( - memory::memoryManager()->allocator(), std::move(ssd), cacheOptions); - cache::AsyncDataCache::setInstance(cache_.get()); + cache_ = velox::cache::AsyncDataCache::create( + velox::memory::memoryManager()->allocator(), + std::move(ssd), + cacheOptions); + velox::cache::AsyncDataCache::setInstance(cache_.get()); PRESTO_STARTUP_LOG(INFO) << cacheStr << " has been setup"; if (isCacheTtlEnabled()) { - cache::CacheTTLController::create(*cache_); + velox::cache::CacheTTLController::create(*cache_); PRESTO_STARTUP_LOG(INFO) << fmt::format( "Cache TTL is enabled, with TTL {} enforced every {}.", - succinctMillis(std::chrono::duration_cast( - systemConfig->cacheVeloxTtlThreshold()) - .count()), - succinctMillis(std::chrono::duration_cast( - systemConfig->cacheVeloxTtlCheckInterval()) - .count())); + velox::succinctMillis( + std::chrono::duration_cast( + systemConfig->cacheVeloxTtlThreshold()) + .count()), + velox::succinctMillis( + std::chrono::duration_cast( + systemConfig->cacheVeloxTtlCheckInterval()) + .count())); } } else { VELOX_CHECK_EQ( @@ -1112,18 +1117,18 @@ PrestoServer::getAdditionalHttpServerFilters() { void PrestoServer::registerConnectorFactories() { // These checks for connector factories can be removed after we remove the // registrations from the Velox library. - if (!connector::hasConnectorFactory( - connector::hive::HiveConnectorFactory::kHiveConnectorName)) { - connector::registerConnectorFactory( - std::make_shared()); - connector::registerConnectorFactory( - std::make_shared( + if (!velox::connector::hasConnectorFactory( + velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) { + velox::connector::registerConnectorFactory( + std::make_shared()); + velox::connector::registerConnectorFactory( + std::make_shared( kHiveHadoop2ConnectorName)); } - if (!connector::hasConnectorFactory( - connector::tpch::TpchConnectorFactory::kTpchConnectorName)) { - connector::registerConnectorFactory( - std::make_shared()); + if (!velox::connector::hasConnectorFactory( + velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) { + velox::connector::registerConnectorFactory( + std::make_shared()); } } @@ -1173,7 +1178,7 @@ std::vector PrestoServer::registerConnectors( getPrestoToVeloxConnector(connectorName); std::shared_ptr connector = - facebook::velox::connector::getConnectorFactory(connectorName) + velox::connector::getConnectorFactory(connectorName) ->newConnector( catalogName, std::move(properties), @@ -1195,7 +1200,7 @@ void PrestoServer::registerSystemConnector() { void PrestoServer::unregisterConnectors() { PRESTO_SHUTDOWN_LOG(INFO) << "Unregistering connectors"; - auto connectors = facebook::velox::connector::getAllConnectors(); + auto connectors = velox::connector::getAllConnectors(); if (connectors.empty()) { PRESTO_SHUTDOWN_LOG(INFO) << "No connectors to unregister"; return; @@ -1204,7 +1209,7 @@ void PrestoServer::unregisterConnectors() { PRESTO_SHUTDOWN_LOG(INFO) << "Unregistering " << connectors.size() << " connectors"; for (const auto& connectorEntry : connectors) { - if (facebook::velox::connector::unregisterConnector(connectorEntry.first)) { + if (velox::connector::unregisterConnector(connectorEntry.first)) { PRESTO_SHUTDOWN_LOG(INFO) << "Unregistered connector: " << connectorEntry.first; } else { @@ -1213,7 +1218,7 @@ void PrestoServer::unregisterConnectors() { } } - facebook::velox::connector::unregisterConnector("$system@system"); + velox::connector::unregisterConnector("$system@system"); PRESTO_SHUTDOWN_LOG(INFO) << "Unregistered " << connectors.size() << " connectors"; } @@ -1225,18 +1230,17 @@ void PrestoServer::registerShuffleInterfaceFactories() { } void PrestoServer::registerCustomOperators() { - facebook::velox::exec::Operator::registerOperator( + velox::exec::Operator::registerOperator( std::make_unique()); - facebook::velox::exec::Operator::registerOperator( - std::make_unique()); - facebook::velox::exec::Operator::registerOperator( + velox::exec::Operator::registerOperator( + std::make_unique()); + velox::exec::Operator::registerOperator( std::make_unique()); // Todo - Split Presto & Presto-on-Spark server into different classes // which will allow server specific operator registration. - facebook::velox::exec::Operator::registerOperator( - std::make_unique< - facebook::presto::operators::BroadcastWriteTranslator>()); + velox::exec::Operator::registerOperator( + std::make_unique()); } void PrestoServer::registerFunctions() { @@ -1285,13 +1289,15 @@ void PrestoServer::registerVectorSerdes() { if (!velox::isRegisteredVectorSerde()) { velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); } - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kPresto)) { + if (!velox::isRegisteredNamedVectorSerde(velox::VectorSerde::Kind::kPresto)) { velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde(); } - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kCompactRow)) { + if (!velox::isRegisteredNamedVectorSerde( + velox::VectorSerde::Kind::kCompactRow)) { velox::serializer::CompactRowVectorSerde::registerNamedVectorSerde(); } - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)) { + if (!velox::isRegisteredNamedVectorSerde( + velox::VectorSerde::Kind::kUnsafeRow)) { velox::serializer::spark::UnsafeRowVectorSerde::registerNamedVectorSerde(); } } @@ -1332,7 +1338,7 @@ void PrestoServer::unregisterFileReadersAndWriters() { void PrestoServer::registerStatsCounters() { registerPrestoMetrics(); - registerVeloxMetrics(); + velox::registerVeloxMetrics(); } std::string PrestoServer::getLocalIp() const {