diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index fbdbcb23564aa..3cedadc480f2b 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -77,7 +77,7 @@ #include #endif -using namespace facebook::velox; +using namespace facebook; namespace facebook::presto { namespace { @@ -105,7 +105,7 @@ void enableChecksum() { velox::exec::OutputBufferManager::getInstance().lock()->setListenerFactory( []() { return std::make_unique< - serializer::presto::PrestoOutputStreamListener>(); + velox::serializer::presto::PrestoOutputStreamListener>(); }); } @@ -238,7 +238,7 @@ void PrestoServer::run() { address_ = fmt::format("[{}]", address_); } nodeLocation_ = nodeConfig->nodeLocation(); - } catch (const VeloxUserError& e) { + } catch (const velox::VeloxUserError& e) { PRESTO_STARTUP_LOG(ERROR) << "Failed to start server due to " << e.what(); exit(EXIT_FAILURE); } @@ -253,8 +253,9 @@ void PrestoServer::run() { // Register Velox connector factory for iceberg. // The iceberg catalog is handled by the hive connector factory. - connector::registerConnectorFactory( - std::make_shared("iceberg")); + velox::connector::registerConnectorFactory( + std::make_shared( + "iceberg")); registerPrestoToVeloxConnector( std::make_unique("hive")); @@ -458,7 +459,7 @@ void PrestoServer::run() { const std::string& taskId, int destination, std::shared_ptr queue, - memory::MemoryPool* pool) { + velox::memory::MemoryPool* pool) { return PrestoExchangeSource::create( taskId, destination, @@ -470,11 +471,11 @@ void PrestoServer::run() { sslContext_); }); - facebook::velox::exec::ExchangeSource::registerFactory( + velox::exec::ExchangeSource::registerFactory( operators::UnsafeRowExchangeSource::createExchangeSource); // Batch broadcast exchange source. - facebook::velox::exec::ExchangeSource::registerFactory( + velox::exec::ExchangeSource::registerFactory( operators::BroadcastExchangeSource::createExchangeSource); pool_ = @@ -520,13 +521,13 @@ void PrestoServer::run() { if (systemConfig->enableVeloxTaskLogging()) { if (auto listener = getTaskListener()) { - exec::registerTaskListener(listener); + velox::exec::registerTaskListener(listener); } } if (systemConfig->enableVeloxExprSetLogging()) { if (auto listener = getExprSetListener()) { - exec::registerExprSetListener(listener); + velox::exec::registerExprSetListener(listener); } } prestoServerOperations_ = @@ -567,7 +568,7 @@ void PrestoServer::run() { PRESTO_STARTUP_LOG(INFO) << "Starting all periodic tasks"; auto* memoryAllocator = velox::memory::memoryManager()->allocator(); - auto* asyncDataCache = cache::AsyncDataCache::getInstance(); + auto* asyncDataCache = velox::cache::AsyncDataCache::getInstance(); periodicTaskManager_ = std::make_unique( driverExecutor_.get(), spillerExecutor_.get(), @@ -782,7 +783,7 @@ void PrestoServer::initializeThreadPools() { } } -std::unique_ptr PrestoServer::setupSsdCache() { +std::unique_ptr PrestoServer::setupSsdCache() { VELOX_CHECK_NULL(cacheExecutor_); auto* systemConfig = SystemConfig::instance(); if (systemConfig->asyncCacheSsdGb() == 0) { @@ -791,7 +792,7 @@ std::unique_ptr PrestoServer::setupSsdCache() { constexpr int32_t kNumSsdShards = 16; cacheExecutor_ = std::make_unique(kNumSsdShards); - cache::SsdCache::Config cacheConfig( + velox::cache::SsdCache::Config cacheConfig( systemConfig->asyncCacheSsdPath(), systemConfig->asyncCacheSsdGb() << 30, kNumSsdShards, @@ -802,7 +803,7 @@ std::unique_ptr PrestoServer::setupSsdCache() { systemConfig->ssdCacheReadVerificationEnabled()); PRESTO_STARTUP_LOG(INFO) << "Initializing SSD cache with " << cacheConfig.toString(); - return std::make_unique(cacheConfig); + return std::make_unique(cacheConfig); } void PrestoServer::initializeVeloxMemory() { @@ -811,7 +812,7 @@ void PrestoServer::initializeVeloxMemory() { PRESTO_STARTUP_LOG(INFO) << "Starting with node memory " << memoryGb << "GB"; // Set up velox memory manager. - memory::MemoryManagerOptions options; + velox::memory::MemoryManagerOptions options; options.allocatorCapacity = memoryGb << 30; if (systemConfig->useMmapAllocator()) { options.useMmapAllocator = true; @@ -865,34 +866,38 @@ void PrestoServer::initializeVeloxMemory() { {std::string(SharedArbitratorConfig::kCheckUsageLeak), folly::to(systemConfig->enableMemoryLeakCheck())}}; } - memory::initializeMemoryManager(options); + velox::memory::initializeMemoryManager(options); PRESTO_STARTUP_LOG(INFO) << "Memory manager has been setup: " - << memory::memoryManager()->toString(); + << velox::memory::memoryManager()->toString(); if (systemConfig->asyncDataCacheEnabled()) { - std::unique_ptr ssd = setupSsdCache(); + std::unique_ptr ssd = setupSsdCache(); std::string cacheStr = ssd == nullptr ? "AsyncDataCache" : "AsyncDataCache with SSD"; - cache::AsyncDataCache::Options cacheOptions{ + velox::cache::AsyncDataCache::Options cacheOptions{ systemConfig->asyncCacheMaxSsdWriteRatio(), systemConfig->asyncCacheSsdSavableRatio(), systemConfig->asyncCacheMinSsdSavableBytes()}; - cache_ = cache::AsyncDataCache::create( - memory::memoryManager()->allocator(), std::move(ssd), cacheOptions); - cache::AsyncDataCache::setInstance(cache_.get()); + cache_ = velox::cache::AsyncDataCache::create( + velox::memory::memoryManager()->allocator(), + std::move(ssd), + cacheOptions); + velox::cache::AsyncDataCache::setInstance(cache_.get()); PRESTO_STARTUP_LOG(INFO) << cacheStr << " has been setup"; if (isCacheTtlEnabled()) { - cache::CacheTTLController::create(*cache_); + velox::cache::CacheTTLController::create(*cache_); PRESTO_STARTUP_LOG(INFO) << fmt::format( "Cache TTL is enabled, with TTL {} enforced every {}.", - succinctMillis(std::chrono::duration_cast( - systemConfig->cacheVeloxTtlThreshold()) - .count()), - succinctMillis(std::chrono::duration_cast( - systemConfig->cacheVeloxTtlCheckInterval()) - .count())); + velox::succinctMillis( + std::chrono::duration_cast( + systemConfig->cacheVeloxTtlThreshold()) + .count()), + velox::succinctMillis( + std::chrono::duration_cast( + systemConfig->cacheVeloxTtlCheckInterval()) + .count())); } } else { VELOX_CHECK_EQ( @@ -1112,18 +1117,18 @@ PrestoServer::getAdditionalHttpServerFilters() { void PrestoServer::registerConnectorFactories() { // These checks for connector factories can be removed after we remove the // registrations from the Velox library. - if (!connector::hasConnectorFactory( - connector::hive::HiveConnectorFactory::kHiveConnectorName)) { - connector::registerConnectorFactory( - std::make_shared()); - connector::registerConnectorFactory( - std::make_shared( + if (!velox::connector::hasConnectorFactory( + velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) { + velox::connector::registerConnectorFactory( + std::make_shared()); + velox::connector::registerConnectorFactory( + std::make_shared( kHiveHadoop2ConnectorName)); } - if (!connector::hasConnectorFactory( - connector::tpch::TpchConnectorFactory::kTpchConnectorName)) { - connector::registerConnectorFactory( - std::make_shared()); + if (!velox::connector::hasConnectorFactory( + velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) { + velox::connector::registerConnectorFactory( + std::make_shared()); } } @@ -1173,7 +1178,7 @@ std::vector PrestoServer::registerConnectors( getPrestoToVeloxConnector(connectorName); std::shared_ptr connector = - facebook::velox::connector::getConnectorFactory(connectorName) + velox::connector::getConnectorFactory(connectorName) ->newConnector( catalogName, std::move(properties), @@ -1195,7 +1200,7 @@ void PrestoServer::registerSystemConnector() { void PrestoServer::unregisterConnectors() { PRESTO_SHUTDOWN_LOG(INFO) << "Unregistering connectors"; - auto connectors = facebook::velox::connector::getAllConnectors(); + auto connectors = velox::connector::getAllConnectors(); if (connectors.empty()) { PRESTO_SHUTDOWN_LOG(INFO) << "No connectors to unregister"; return; @@ -1204,7 +1209,7 @@ void PrestoServer::unregisterConnectors() { PRESTO_SHUTDOWN_LOG(INFO) << "Unregistering " << connectors.size() << " connectors"; for (const auto& connectorEntry : connectors) { - if (facebook::velox::connector::unregisterConnector(connectorEntry.first)) { + if (velox::connector::unregisterConnector(connectorEntry.first)) { PRESTO_SHUTDOWN_LOG(INFO) << "Unregistered connector: " << connectorEntry.first; } else { @@ -1213,7 +1218,7 @@ void PrestoServer::unregisterConnectors() { } } - facebook::velox::connector::unregisterConnector("$system@system"); + velox::connector::unregisterConnector("$system@system"); PRESTO_SHUTDOWN_LOG(INFO) << "Unregistered " << connectors.size() << " connectors"; } @@ -1225,18 +1230,17 @@ void PrestoServer::registerShuffleInterfaceFactories() { } void PrestoServer::registerCustomOperators() { - facebook::velox::exec::Operator::registerOperator( + velox::exec::Operator::registerOperator( std::make_unique()); - facebook::velox::exec::Operator::registerOperator( - std::make_unique()); - facebook::velox::exec::Operator::registerOperator( + velox::exec::Operator::registerOperator( + std::make_unique()); + velox::exec::Operator::registerOperator( std::make_unique()); // Todo - Split Presto & Presto-on-Spark server into different classes // which will allow server specific operator registration. - facebook::velox::exec::Operator::registerOperator( - std::make_unique< - facebook::presto::operators::BroadcastWriteTranslator>()); + velox::exec::Operator::registerOperator( + std::make_unique()); } void PrestoServer::registerFunctions() { @@ -1285,13 +1289,15 @@ void PrestoServer::registerVectorSerdes() { if (!velox::isRegisteredVectorSerde()) { velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); } - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kPresto)) { + if (!velox::isRegisteredNamedVectorSerde(velox::VectorSerde::Kind::kPresto)) { velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde(); } - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kCompactRow)) { + if (!velox::isRegisteredNamedVectorSerde( + velox::VectorSerde::Kind::kCompactRow)) { velox::serializer::CompactRowVectorSerde::registerNamedVectorSerde(); } - if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)) { + if (!velox::isRegisteredNamedVectorSerde( + velox::VectorSerde::Kind::kUnsafeRow)) { velox::serializer::spark::UnsafeRowVectorSerde::registerNamedVectorSerde(); } } @@ -1332,7 +1338,7 @@ void PrestoServer::unregisterFileReadersAndWriters() { void PrestoServer::registerStatsCounters() { registerPrestoMetrics(); - registerVeloxMetrics(); + velox::registerVeloxMetrics(); } std::string PrestoServer::getLocalIp() const {