Skip to content

Commit

Permalink
[native] Use Velox namespace explicitly in PrestoServer.cpp
Browse files Browse the repository at this point in the history
The namespace used for most Velox APIs was inconsistent in the
PrestoServer.cpp file. This PR makes Velox API usage consistent.
This also can avoid issues with ambiguous names if somehow another
API introduces a similar naming convention but in a different
namespace that could be used by Pretissimo.
  • Loading branch information
czentgr committed Dec 3, 2024
1 parent 55479fb commit 68b26f3
Showing 1 changed file with 60 additions and 54 deletions.
114 changes: 60 additions & 54 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
#include <sched.h>
#endif

using namespace facebook::velox;
using namespace facebook;

namespace facebook::presto {
namespace {
Expand Down Expand Up @@ -105,7 +105,7 @@ void enableChecksum() {
velox::exec::OutputBufferManager::getInstance().lock()->setListenerFactory(
[]() {
return std::make_unique<
serializer::presto::PrestoOutputStreamListener>();
velox::serializer::presto::PrestoOutputStreamListener>();
});
}

Expand Down Expand Up @@ -238,7 +238,7 @@ void PrestoServer::run() {
address_ = fmt::format("[{}]", address_);
}
nodeLocation_ = nodeConfig->nodeLocation();
} catch (const VeloxUserError& e) {
} catch (const velox::VeloxUserError& e) {
PRESTO_STARTUP_LOG(ERROR) << "Failed to start server due to " << e.what();
exit(EXIT_FAILURE);
}
Expand All @@ -253,8 +253,9 @@ void PrestoServer::run() {

// Register Velox connector factory for iceberg.
// The iceberg catalog is handled by the hive connector factory.
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>("iceberg"));
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
"iceberg"));

registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive"));
Expand Down Expand Up @@ -458,7 +459,7 @@ void PrestoServer::run() {
const std::string& taskId,
int destination,
std::shared_ptr<velox::exec::ExchangeQueue> queue,
memory::MemoryPool* pool) {
velox::memory::MemoryPool* pool) {
return PrestoExchangeSource::create(
taskId,
destination,
Expand All @@ -470,11 +471,11 @@ void PrestoServer::run() {
sslContext_);
});

facebook::velox::exec::ExchangeSource::registerFactory(
velox::exec::ExchangeSource::registerFactory(
operators::UnsafeRowExchangeSource::createExchangeSource);

// Batch broadcast exchange source.
facebook::velox::exec::ExchangeSource::registerFactory(
velox::exec::ExchangeSource::registerFactory(
operators::BroadcastExchangeSource::createExchangeSource);

pool_ =
Expand Down Expand Up @@ -520,13 +521,13 @@ void PrestoServer::run() {

if (systemConfig->enableVeloxTaskLogging()) {
if (auto listener = getTaskListener()) {
exec::registerTaskListener(listener);
velox::exec::registerTaskListener(listener);
}
}

if (systemConfig->enableVeloxExprSetLogging()) {
if (auto listener = getExprSetListener()) {
exec::registerExprSetListener(listener);
velox::exec::registerExprSetListener(listener);
}
}
prestoServerOperations_ =
Expand Down Expand Up @@ -567,7 +568,7 @@ void PrestoServer::run() {
PRESTO_STARTUP_LOG(INFO) << "Starting all periodic tasks";

auto* memoryAllocator = velox::memory::memoryManager()->allocator();
auto* asyncDataCache = cache::AsyncDataCache::getInstance();
auto* asyncDataCache = velox::cache::AsyncDataCache::getInstance();
periodicTaskManager_ = std::make_unique<PeriodicTaskManager>(
driverExecutor_.get(),
spillerExecutor_.get(),
Expand Down Expand Up @@ -782,7 +783,7 @@ void PrestoServer::initializeThreadPools() {
}
}

std::unique_ptr<cache::SsdCache> PrestoServer::setupSsdCache() {
std::unique_ptr<velox::cache::SsdCache> PrestoServer::setupSsdCache() {
VELOX_CHECK_NULL(cacheExecutor_);
auto* systemConfig = SystemConfig::instance();
if (systemConfig->asyncCacheSsdGb() == 0) {
Expand All @@ -791,7 +792,7 @@ std::unique_ptr<cache::SsdCache> PrestoServer::setupSsdCache() {

constexpr int32_t kNumSsdShards = 16;
cacheExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(kNumSsdShards);
cache::SsdCache::Config cacheConfig(
velox::cache::SsdCache::Config cacheConfig(
systemConfig->asyncCacheSsdPath(),
systemConfig->asyncCacheSsdGb() << 30,
kNumSsdShards,
Expand All @@ -802,7 +803,7 @@ std::unique_ptr<cache::SsdCache> PrestoServer::setupSsdCache() {
systemConfig->ssdCacheReadVerificationEnabled());
PRESTO_STARTUP_LOG(INFO) << "Initializing SSD cache with "
<< cacheConfig.toString();
return std::make_unique<cache::SsdCache>(cacheConfig);
return std::make_unique<velox::cache::SsdCache>(cacheConfig);
}

void PrestoServer::initializeVeloxMemory() {
Expand All @@ -811,7 +812,7 @@ void PrestoServer::initializeVeloxMemory() {
PRESTO_STARTUP_LOG(INFO) << "Starting with node memory " << memoryGb << "GB";

// Set up velox memory manager.
memory::MemoryManagerOptions options;
velox::memory::MemoryManagerOptions options;
options.allocatorCapacity = memoryGb << 30;
if (systemConfig->useMmapAllocator()) {
options.useMmapAllocator = true;
Expand Down Expand Up @@ -865,34 +866,38 @@ void PrestoServer::initializeVeloxMemory() {
{std::string(SharedArbitratorConfig::kCheckUsageLeak),
folly::to<std::string>(systemConfig->enableMemoryLeakCheck())}};
}
memory::initializeMemoryManager(options);
velox::memory::initializeMemoryManager(options);
PRESTO_STARTUP_LOG(INFO) << "Memory manager has been setup: "
<< memory::memoryManager()->toString();
<< velox::memory::memoryManager()->toString();

if (systemConfig->asyncDataCacheEnabled()) {
std::unique_ptr<cache::SsdCache> ssd = setupSsdCache();
std::unique_ptr<velox::cache::SsdCache> ssd = setupSsdCache();
std::string cacheStr =
ssd == nullptr ? "AsyncDataCache" : "AsyncDataCache with SSD";

cache::AsyncDataCache::Options cacheOptions{
velox::cache::AsyncDataCache::Options cacheOptions{
systemConfig->asyncCacheMaxSsdWriteRatio(),
systemConfig->asyncCacheSsdSavableRatio(),
systemConfig->asyncCacheMinSsdSavableBytes()};
cache_ = cache::AsyncDataCache::create(
memory::memoryManager()->allocator(), std::move(ssd), cacheOptions);
cache::AsyncDataCache::setInstance(cache_.get());
cache_ = velox::cache::AsyncDataCache::create(
velox::memory::memoryManager()->allocator(),
std::move(ssd),
cacheOptions);
velox::cache::AsyncDataCache::setInstance(cache_.get());
PRESTO_STARTUP_LOG(INFO) << cacheStr << " has been setup";

if (isCacheTtlEnabled()) {
cache::CacheTTLController::create(*cache_);
velox::cache::CacheTTLController::create(*cache_);
PRESTO_STARTUP_LOG(INFO) << fmt::format(
"Cache TTL is enabled, with TTL {} enforced every {}.",
succinctMillis(std::chrono::duration_cast<std::chrono::milliseconds>(
systemConfig->cacheVeloxTtlThreshold())
.count()),
succinctMillis(std::chrono::duration_cast<std::chrono::milliseconds>(
systemConfig->cacheVeloxTtlCheckInterval())
.count()));
velox::succinctMillis(
std::chrono::duration_cast<std::chrono::milliseconds>(
systemConfig->cacheVeloxTtlThreshold())
.count()),
velox::succinctMillis(
std::chrono::duration_cast<std::chrono::milliseconds>(
systemConfig->cacheVeloxTtlCheckInterval())
.count()));
}
} else {
VELOX_CHECK_EQ(
Expand Down Expand Up @@ -1112,18 +1117,18 @@ PrestoServer::getAdditionalHttpServerFilters() {
void PrestoServer::registerConnectorFactories() {
// These checks for connector factories can be removed after we remove the
// registrations from the Velox library.
if (!connector::hasConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>(
if (!velox::connector::hasConnectorFactory(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>());
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kHiveHadoop2ConnectorName));
}
if (!connector::hasConnectorFactory(
connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
connector::registerConnectorFactory(
std::make_shared<connector::tpch::TpchConnectorFactory>());
if (!velox::connector::hasConnectorFactory(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
}
}

Expand Down Expand Up @@ -1173,7 +1178,7 @@ std::vector<std::string> PrestoServer::registerConnectors(
getPrestoToVeloxConnector(connectorName);

std::shared_ptr<velox::connector::Connector> connector =
facebook::velox::connector::getConnectorFactory(connectorName)
velox::connector::getConnectorFactory(connectorName)
->newConnector(
catalogName,
std::move(properties),
Expand All @@ -1195,7 +1200,7 @@ void PrestoServer::registerSystemConnector() {

void PrestoServer::unregisterConnectors() {
PRESTO_SHUTDOWN_LOG(INFO) << "Unregistering connectors";
auto connectors = facebook::velox::connector::getAllConnectors();
auto connectors = velox::connector::getAllConnectors();
if (connectors.empty()) {
PRESTO_SHUTDOWN_LOG(INFO) << "No connectors to unregister";
return;
Expand All @@ -1204,7 +1209,7 @@ void PrestoServer::unregisterConnectors() {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Unregistering " << connectors.size() << " connectors";
for (const auto& connectorEntry : connectors) {
if (facebook::velox::connector::unregisterConnector(connectorEntry.first)) {
if (velox::connector::unregisterConnector(connectorEntry.first)) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Unregistered connector: " << connectorEntry.first;
} else {
Expand All @@ -1213,7 +1218,7 @@ void PrestoServer::unregisterConnectors() {
}
}

facebook::velox::connector::unregisterConnector("$system@system");
velox::connector::unregisterConnector("$system@system");
PRESTO_SHUTDOWN_LOG(INFO)
<< "Unregistered " << connectors.size() << " connectors";
}
Expand All @@ -1225,18 +1230,17 @@ void PrestoServer::registerShuffleInterfaceFactories() {
}

void PrestoServer::registerCustomOperators() {
facebook::velox::exec::Operator::registerOperator(
velox::exec::Operator::registerOperator(
std::make_unique<operators::PartitionAndSerializeTranslator>());
facebook::velox::exec::Operator::registerOperator(
std::make_unique<facebook::presto::operators::ShuffleWriteTranslator>());
facebook::velox::exec::Operator::registerOperator(
velox::exec::Operator::registerOperator(
std::make_unique<operators::ShuffleWriteTranslator>());
velox::exec::Operator::registerOperator(
std::make_unique<operators::ShuffleReadTranslator>());

// Todo - Split Presto & Presto-on-Spark server into different classes
// which will allow server specific operator registration.
facebook::velox::exec::Operator::registerOperator(
std::make_unique<
facebook::presto::operators::BroadcastWriteTranslator>());
velox::exec::Operator::registerOperator(
std::make_unique<operators::BroadcastWriteTranslator>());
}

void PrestoServer::registerFunctions() {
Expand Down Expand Up @@ -1285,13 +1289,15 @@ void PrestoServer::registerVectorSerdes() {
if (!velox::isRegisteredVectorSerde()) {
velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
}
if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kPresto)) {
if (!velox::isRegisteredNamedVectorSerde(velox::VectorSerde::Kind::kPresto)) {
velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
}
if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kCompactRow)) {
if (!velox::isRegisteredNamedVectorSerde(
velox::VectorSerde::Kind::kCompactRow)) {
velox::serializer::CompactRowVectorSerde::registerNamedVectorSerde();
}
if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)) {
if (!velox::isRegisteredNamedVectorSerde(
velox::VectorSerde::Kind::kUnsafeRow)) {
velox::serializer::spark::UnsafeRowVectorSerde::registerNamedVectorSerde();
}
}
Expand Down Expand Up @@ -1332,7 +1338,7 @@ void PrestoServer::unregisterFileReadersAndWriters() {

void PrestoServer::registerStatsCounters() {
registerPrestoMetrics();
registerVeloxMetrics();
velox::registerVeloxMetrics();
}

std::string PrestoServer::getLocalIp() const {
Expand Down

0 comments on commit 68b26f3

Please sign in to comment.