Skip to content

Commit

Permalink
[Native] Make PrestoServer create and pass connectorCpuExecutor to Co…
Browse files Browse the repository at this point in the history
…nnector

Create a CPUThreadPoolExecutor data member, connectorCpuExecutor_, for PrestoServer.
Pass it to every created Connector. Add a new config `connector.num-cpu-threads-hw-multiplier`
to control how many threads would be used for the executor.
`connector.num-cpu-threads-hw-multiplier` will set connectorCpuExecutor_ to nullptr.

Make a process-wise managed CPUThreadPoolExecutor instance available to all connectors.
Connector could schedule CPU-bound async operators to it so that they will not occupy
the driver thread pool.
  • Loading branch information
gggrace14 authored and xiaoxmeng committed Dec 16, 2024
1 parent 8f3cd13 commit 2183c0a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 2 deletions.
26 changes: 25 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,15 @@ void PrestoServer::run() {
// HTTP IO executor threads are joined.
driverExecutor_.reset();

if (connectorCpuExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining Connector CPU Executor '"
<< connectorCpuExecutor_->getName()
<< "': threads: " << connectorCpuExecutor_->numActiveThreads() << "/"
<< connectorCpuExecutor_->numThreads();
connectorCpuExecutor_->join();
}

if (connectorIoExecutor_) {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining Connector IO Executor '" << connectorIoExecutor_->getName()
Expand Down Expand Up @@ -1172,6 +1181,20 @@ std::vector<std::string> PrestoServer::registerConnectors(
const fs::path& configDirectoryPath) {
static const std::string kPropertiesExtension = ".properties";

const auto numConnectorCpuThreads = std::max<size_t>(
SystemConfig::instance()->connectorNumCpuThreadsHwMultiplier() *
std::thread::hardware_concurrency(),
0);
if (numConnectorCpuThreads > 0) {
connectorCpuExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
numConnectorCpuThreads,
std::make_shared<folly::NamedThreadFactory>("Connector"));

PRESTO_STARTUP_LOG(INFO)
<< "Connector CPU executor has " << connectorCpuExecutor_->numThreads()
<< " threads.";
}

const auto numConnectorIoThreads = std::max<size_t>(
SystemConfig::instance()->connectorNumIoThreadsHwMultiplier() *
std::thread::hardware_concurrency(),
Expand Down Expand Up @@ -1218,7 +1241,8 @@ std::vector<std::string> PrestoServer::registerConnectors(
->newConnector(
catalogName,
std::move(properties),
connectorIoExecutor_.get());
connectorIoExecutor_.get(),
connectorCpuExecutor_.get());
velox::connector::registerConnector(connector);
}
}
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ class PrestoServer {
// Executor for background writing into SSD cache.
std::unique_ptr<folly::CPUThreadPoolExecutor> cacheExecutor_;

// Executor for async execution for connectors.
std::unique_ptr<folly::CPUThreadPoolExecutor> connectorCpuExecutor_;

// Executor for async IO for connectors.
std::unique_ptr<folly::IOThreadPoolExecutor> connectorIoExecutor_;

Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ SystemConfig::SystemConfig() {
NONE_PROP(kHttpsClientCertAndKeyPath),
NUM_PROP(kExchangeHttpClientNumIoThreadsHwMultiplier, 1.0),
NUM_PROP(kExchangeHttpClientNumCpuThreadsHwMultiplier, 1.0),
NUM_PROP(kConnectorNumCpuThreadsHwMultiplier, 0.0),
NUM_PROP(kConnectorNumIoThreadsHwMultiplier, 1.0),
NUM_PROP(kDriverNumCpuThreadsHwMultiplier, 4.0),
BOOL_PROP(kDriverThreadsBatchSchedulingEnabled, false),
Expand Down Expand Up @@ -375,6 +376,10 @@ double SystemConfig::exchangeHttpClientNumCpuThreadsHwMultiplier() const {
.value();
}

double SystemConfig::connectorNumCpuThreadsHwMultiplier() const {
return optionalProperty<double>(kConnectorNumCpuThreadsHwMultiplier).value();
}

double SystemConfig::connectorNumIoThreadsHwMultiplier() const {
return optionalProperty<double>(kConnectorNumIoThreadsHwMultiplier).value();
}
Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kHttpsClientCertAndKeyPath{
"https-client-cert-key-path"};

/// Floating point number used in calculating how many threads we would use
/// for CPU executor for connectors mainly for async operators:
/// hw_concurrency x multiplier.
/// If 0.0 then connector CPU executor would not be created.
/// 0.0 is default.
static constexpr std::string_view kConnectorNumCpuThreadsHwMultiplier{
"connector.num-cpu-threads-hw-multiplier"};

/// Floating point number used in calculating how many threads we would use
/// for IO executor for connectors mainly to do preload/prefetch:
/// hw_concurrency x multiplier.
Expand Down Expand Up @@ -724,6 +732,8 @@ class SystemConfig : public ConfigBase {

double exchangeHttpClientNumCpuThreadsHwMultiplier() const;

double connectorNumCpuThreadsHwMultiplier() const;

double connectorNumIoThreadsHwMultiplier() const;

double driverNumCpuThreadsHwMultiplier() const;
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 67 files
+2 −1 velox/connectors/Connector.h
+3 −2 velox/connectors/fuzzer/FuzzerConnector.h
+1 −1 velox/connectors/hive/HiveConfig.cpp
+3 −2 velox/connectors/hive/HiveConnector.h
+2 −1 velox/connectors/tests/ConnectorTest.cpp
+3 −2 velox/connectors/tpch/TpchConnector.h
+9 −0 velox/core/QueryConfig.h
+4 −0 velox/docs/configs.rst
+1 −0 velox/docs/develop/testing.rst
+14 −13 velox/docs/develop/testing/cache-fuzzer.rst
+195 −0 velox/docs/functions/presto/aggregate.rst
+5 −5 velox/docs/functions/presto/coverage.rst
+3 −3 velox/docs/functions/spark/regexp.rst
+19 −0 velox/docs/functions/spark/string.rst
+1 −1 velox/dwio/common/BufferedInput.h
+1 −1 velox/dwio/common/CacheInputStream.cpp
+6 −1 velox/dwio/common/CacheInputStream.h
+1 −1 velox/dwio/common/DirectInputStream.cpp
+1 −1 velox/dwio/common/DirectInputStream.h
+2 −2 velox/dwio/common/SeekableInputStream.cpp
+3 −3 velox/dwio/common/SeekableInputStream.h
+30 −7 velox/dwio/common/Statistics.h
+1 −1 velox/dwio/common/compression/PagedInputStream.h
+7 −4 velox/dwio/dwrf/reader/DwrfReader.cpp
+3 −1 velox/dwio/dwrf/reader/DwrfReader.h
+100 −41 velox/dwio/dwrf/reader/ReaderBase.cpp
+20 −11 velox/dwio/dwrf/reader/ReaderBase.h
+3 −3 velox/dwio/dwrf/test/ReaderTest.cpp
+1 −1 velox/dwio/dwrf/test/TestDecompression.cpp
+3 −1 velox/dwio/dwrf/test/WriterTest.cpp
+111 −39 velox/exec/fuzzer/CacheFuzzer.cpp
+2 −1 velox/exec/tests/AsyncConnectorTest.cpp
+6 −12 velox/exec/tests/PrintPlanWithStatsTest.cpp
+22 −5 velox/exec/tests/TableScanTest.cpp
+15 −3 velox/expression/fuzzer/ExpressionFuzzer.cpp
+43 −24 velox/functions/lib/Re2Functions.cpp
+22 −3 velox/functions/lib/Re2Functions.h
+33 −23 velox/functions/lib/tests/Re2FunctionsTest.cpp
+7 −2 velox/functions/prestosql/DateTimeFunctions.h
+6 −0 velox/functions/prestosql/DateTimeImpl.h
+5 −0 velox/functions/prestosql/aggregates/AggregateNames.h
+1 −0 velox/functions/prestosql/aggregates/CMakeLists.txt
+684 −0 velox/functions/prestosql/aggregates/ClassificationAggregation.cpp
+5 −0 velox/functions/prestosql/aggregates/RegisterAggregateFunctions.cpp
+1 −0 velox/functions/prestosql/aggregates/tests/CMakeLists.txt
+247 −0 velox/functions/prestosql/aggregates/tests/ClassificationAggregationTest.cpp
+11 −1 velox/functions/prestosql/fuzzer/AggregationFuzzerTest.cpp
+85 −0 velox/functions/prestosql/fuzzer/ClassificationAggregationInputGenerator.h
+16 −1 velox/functions/prestosql/fuzzer/WindowFuzzerTest.cpp
+21 −0 velox/functions/prestosql/tests/DateTimeFunctionsTest.cpp
+24 −0 velox/functions/prestosql/tests/RegexpReplaceTest.cpp
+1 −0 velox/functions/sparksql/CMakeLists.txt
+390 −0 velox/functions/sparksql/ConcatWs.cpp
+35 −0 velox/functions/sparksql/ConcatWs.h
+24 −19 velox/functions/sparksql/RegexFunctions.cpp
+19 −0 velox/functions/sparksql/Split.h
+5 −0 velox/functions/sparksql/registration/RegisterString.cpp
+1 −0 velox/functions/sparksql/tests/CMakeLists.txt
+305 −0 velox/functions/sparksql/tests/ConcatWsTest.cpp
+4 −2 velox/functions/sparksql/tests/RegexFunctionsTest.cpp
+13 −129 velox/serializers/CompactRowSerializer.cpp
+190 −0 velox/serializers/RowSerializer.h
+5 −141 velox/serializers/UnsafeRowSerializer.cpp
+3 −6 velox/serializers/tests/UnsafeRowSerializerTest.cpp
+2 −2 velox/type/Timestamp.h
+18 −0 velox/type/tz/TimeZoneMap.cpp
+8 −0 velox/type/tz/TimeZoneMap.h

0 comments on commit 2183c0a

Please sign in to comment.