diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 7c6bb405a8c..f0ed45a873e 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -100,14 +100,16 @@ int main(int argc, char *argv[]) { // The meta server has only one space, one part. partMan->addPart(0, 0, std::move(peersRet.value())); - // folly IOThreadPoolExecutor + // notice: ioThreadPool and acceptThreadPool will stopped when NebulaStore free raftservice auto ioPool = std::make_shared(FLAGS_num_io_threads); + auto acceptThreadPool = std::make_shared(1); nebula::kvstore::KVOptions options; options.dataPaths_ = {FLAGS_data_path}; options.partMan_ = std::move(partMan); - auto kvstore = std::make_unique(std::move(options), - ioPool, + auto kvstore = std::make_unique(std::move(options), + ioPool, + acceptThreadPool, localhost); if (!(kvstore->init())) { LOG(ERROR) << "nebula store init failed"; @@ -154,7 +156,15 @@ int main(int argc, char *argv[]) { gServer->setReusePort(FLAGS_reuse_port); gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection gServer->setIOThreadPool(ioPool); + gServer->setAcceptExecutor(acceptThreadPool); + + // set false to stop that gServer stop all io thread when call gServer->stop(); + // because NebulaStore's raft part dependencies the io thread pool. + gServer->setStopWorkersOnStopListening(false); gServer->serve(); // Will wait until the server shuts down + + // must stop the cpu worker first, because kvstore will free before gServer. + gServer->getThreadManager()->join(); } catch (const std::exception &e) { nebula::WebService::stop(); LOG(ERROR) << "Exception thrown: " << e.what(); @@ -169,7 +179,7 @@ int main(int argc, char *argv[]) { Status setupSignalHandler() { return nebula::SignalHandler::install( - {SIGINT, SIGTERM}, + {SIGINT, SIGTERM}, [](nebula::SignalHandler::GeneralSignalInfo *info) { signalHandler(info->sig()); }); @@ -181,7 +191,9 @@ void signalHandler(int sig) { case SIGINT: case SIGTERM: FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig)); - gServer->stop(); + if (gServer) { + gServer->stop(); + } break; default: FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig)); diff --git a/src/daemons/StorageDaemon.cpp b/src/daemons/StorageDaemon.cpp index 7150db34070..e71464e20db 100644 --- a/src/daemons/StorageDaemon.cpp +++ b/src/daemons/StorageDaemon.cpp @@ -61,6 +61,7 @@ std::unique_ptr getStoreInstance( HostAddr localhost, std::vector paths, std::shared_ptr ioPool, + std::shared_ptr acceptPool, nebula::meta::MetaClient* metaClient, nebula::meta::SchemaManager* schemaMan) { nebula::kvstore::KVOptions options; @@ -73,6 +74,7 @@ std::unique_ptr getStoreInstance( if (FLAGS_store_type == "nebula") { auto nbStore = std::make_unique(std::move(options), ioPool, + acceptPool, localhost); if (!(nbStore->init())) { LOG(ERROR) << "nebula store init failed"; @@ -156,11 +158,12 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } + // notice: ioThreadPool and acceptThreadPool will stopped when NebulaStore free raftservice auto ioThreadPool = std::make_shared(FLAGS_num_io_threads); + auto acceptThreadPool = std::make_shared(1); // Meta client - auto metaClient = std::make_unique(ioThreadPool, - std::move(metaAddrsRet.value()), + auto metaClient = std::make_unique(std::move(metaAddrsRet.value()), localhost, true); if (!metaClient->waitForMetadReady()) { @@ -175,6 +178,7 @@ int main(int argc, char *argv[]) { std::unique_ptr kvstore = getStoreInstance(localhost, std::move(paths), ioThreadPool, + acceptThreadPool, metaClient.get(), schemaMan.get()); @@ -220,9 +224,18 @@ int main(int argc, char *argv[]) { gServer->setReusePort(FLAGS_reuse_port); gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection gServer->setIOThreadPool(ioThreadPool); + gServer->setAcceptExecutor(acceptThreadPool); gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads); gServer->setCPUWorkerThreadName("executor"); + + // set false to stop that gServer stop all io thread when call gServer->stop(); + // because NebulaStore's raft part dependencies the io thread pool. + gServer->setStopWorkersOnStopListening(false); + gServer->serve(); // Will wait until the server shuts down + + // must stop the cpu worker first, because kvstore will free before gServer. + gServer->getThreadManager()->join(); } catch (const std::exception& e) { nebula::WebService::stop(); LOG(ERROR) << "Start thrift server failed, error:" << e.what(); @@ -249,7 +262,9 @@ void signalHandler(int sig) { case SIGINT: case SIGTERM: FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig)); - gServer->stop(); + if (gServer) { + gServer->stop(); + } break; default: FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig)); diff --git a/src/graph/ExecutionEngine.cpp b/src/graph/ExecutionEngine.cpp index 7f2eb69f650..c4af294463a 100644 --- a/src/graph/ExecutionEngine.cpp +++ b/src/graph/ExecutionEngine.cpp @@ -28,7 +28,7 @@ Status ExecutionEngine::init(std::shared_ptr ioExec if (!addrs.ok()) { return addrs.status(); } - metaClient_ = std::make_unique(ioExecutor, std::move(addrs.value())); + metaClient_ = std::make_unique(std::move(addrs.value())); // load data try 3 time bool loadDataOk = metaClient_->waitForMetadReady(3); if (!loadDataOk) { diff --git a/src/graph/test/TestEnv.cpp b/src/graph/test/TestEnv.cpp index 3525b01c281..2309af49c9f 100644 --- a/src/graph/test/TestEnv.cpp +++ b/src/graph/test/TestEnv.cpp @@ -35,7 +35,6 @@ void TestEnv::SetUp() { FLAGS_meta_server_addrs = folly::stringPrintf("127.0.0.1:%d", metaServerPort()); // Create storageServer - auto threadPool = std::make_shared(1); auto addrsRet = network::NetworkUtils::toHosts(folly::stringPrintf("127.0.0.1:%d", metaServerPort())); CHECK(addrsRet.ok()) << addrsRet.status(); @@ -45,8 +44,7 @@ void TestEnv::SetUp() { LOG(ERROR) << "Bad local host addr, status:" << hostRet.status(); } auto& localhost = hostRet.value(); - mClient_ = std::make_unique(threadPool, - std::move(addrsRet.value()), + mClient_ = std::make_unique(std::move(addrsRet.value()), localhost, true); auto r = mClient_->addHosts({localhost}).get(); diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index c2b3649840c..5eee8e3b212 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -37,6 +37,7 @@ namespace nebula { namespace kvstore { NebulaStore::~NebulaStore() { + // we must stop worker before raft service, because raftservice will stop io thread pool workers_->stop(); workers_->wait(); LOG(INFO) << "Stop the raft service..."; @@ -50,7 +51,7 @@ bool NebulaStore::init() { LOG(INFO) << "Start the raft service..."; workers_ = std::make_shared(); workers_->start(FLAGS_num_workers); - raftService_ = raftex::RaftexService::createService(ioPool_, raftAddr_.second); + raftService_ = raftex::RaftexService::createService(ioPool_, acceptPool_, raftAddr_.second); if (!raftService_->start()) { LOG(ERROR) << "Start the raft service failed"; return false; diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 443b196ed8d..7e6506a2fd0 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -40,8 +40,10 @@ class NebulaStore : public KVStore, public Handler { public: NebulaStore(KVOptions options, std::shared_ptr ioPool, + std::shared_ptr acceptPool, HostAddr serviceAddr) : ioPool_(ioPool) + , acceptPool_(acceptPool) , storeSvcAddr_(serviceAddr) , raftAddr_(getRaftAddr(serviceAddr)) , options_(std::move(options)) { @@ -187,6 +189,7 @@ class NebulaStore : public KVStore, public Handler { std::unordered_map> spaces_; std::shared_ptr ioPool_; + std::shared_ptr acceptPool_; std::shared_ptr workers_; HostAddr storeSvcAddr_; HostAddr raftAddr_; diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index ff21cae4807..9c7d16e6f9c 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -18,7 +18,8 @@ namespace raftex { * ******************************************************/ std::shared_ptr RaftexService::createService( - std::shared_ptr pool, + std::shared_ptr ioPool, + std::shared_ptr acceptPool, uint16_t port) { auto svc = std::shared_ptr(new RaftexService()); CHECK(svc != nullptr) << "Failed to create a raft service"; @@ -27,7 +28,7 @@ std::shared_ptr RaftexService::createService( CHECK(svc->server_ != nullptr) << "Failed to create a thrift server"; svc->server_->setInterface(svc); - svc->initThriftServer(pool, port); + svc->initThriftServer(ioPool, acceptPool, port); return svc; } @@ -58,12 +59,15 @@ void RaftexService::waitUntilReady() { } -void RaftexService::initThriftServer(std::shared_ptr pool, - uint16_t port) { +void RaftexService::initThriftServer(std::shared_ptr ioPool, + std::shared_ptr acceptPool, + uint16_t port) { LOG(INFO) << "Init thrift server for raft service."; server_->setPort(port); - if (pool != nullptr) { - server_->setIOThreadPool(pool); + if (ioPool != nullptr && acceptPool != nullptr) { + server_->setIOThreadPool(ioPool); + server_->setAcceptExecutor(acceptPool); + server_->setStopWorkersOnStopListening(false); } } diff --git a/src/kvstore/raftex/RaftexService.h b/src/kvstore/raftex/RaftexService.h index b7dd4f592fa..c358384aa24 100644 --- a/src/kvstore/raftex/RaftexService.h +++ b/src/kvstore/raftex/RaftexService.h @@ -23,7 +23,8 @@ class IOThreadPoolObserver; class RaftexService : public cpp2::RaftexServiceSvIf { public: static std::shared_ptr createService( - std::shared_ptr pool, + std::shared_ptr ioPool, + std::shared_ptr acceptPool, uint16_t port = 0); virtual ~RaftexService(); @@ -47,7 +48,9 @@ class RaftexService : public cpp2::RaftexServiceSvIf { void removePartition(std::shared_ptr part); private: - void initThriftServer(std::shared_ptr pool, uint16_t port = 0); + void initThriftServer(std::shared_ptr ioPool, + std::shared_ptr acceptPool, + uint16_t port = 0); bool setup(); void serve(); diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index 5aa451a08b7..2b48441efb7 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -173,7 +173,7 @@ void setupRaft( // Set up services for (int i = 0; i < numCopies; ++i) { - services.emplace_back(RaftexService::createService(nullptr)); + services.emplace_back(RaftexService::createService(nullptr, nullptr)); if (!services.back()->start()) return; uint16_t port = services.back()->getServerPort(); diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 71c9459bf7f..f4591f1a1cf 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -19,7 +19,6 @@ DECLARE_uint32(heartbeat_interval); namespace nebula { namespace kvstore { -auto ioThreadPool = std::make_shared(4); template void dump(const std::vector& v) { @@ -56,8 +55,13 @@ TEST(NebulaStoreTest, SimpleTest) { options.dataPaths_ = std::move(paths); options.partMan_ = std::move(partMan); HostAddr local = {0, 0}; + + auto ioThreadPool = std::make_shared(4); + auto acceptThreadPool = std::make_shared(1); + auto store = std::make_unique(std::move(options), ioThreadPool, + acceptThreadPool, local); store->init(); sleep(1); @@ -155,8 +159,13 @@ TEST(NebulaStoreTest, PartsTest) { options.dataPaths_ = std::move(paths); options.partMan_ = std::move(partMan); HostAddr local = {0, 0}; + + auto ioThreadPool = std::make_shared(4); + auto acceptThreadPool = std::make_shared(1); + auto store = std::make_unique(std::move(options), ioThreadPool, + acceptThreadPool, local); store->init(); auto check = [&](GraphSpaceID spaceId) { @@ -244,6 +253,7 @@ TEST(NebulaStoreTest, ThreeCopiesTest) { const std::string& path) -> std::unique_ptr { LOG(INFO) << "Start nebula store on " << peers[index]; auto sIoThreadPool = std::make_shared(4); + auto sAcceptThreadPool = std::make_shared(1); auto partMan = std::make_unique(); for (auto partId = 0; partId < 3; partId++) { PartMeta pm; @@ -260,6 +270,7 @@ TEST(NebulaStoreTest, ThreeCopiesTest) { HostAddr local = peers[index]; return std::make_unique(std::move(options), sIoThreadPool, + sAcceptThreadPool, local); }; int32_t replicas = 3; diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 7d78a75ae7f..9e630f0bf6d 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -20,6 +20,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { explicit MetaServiceHandler(kvstore::KVStore* kv) : kvstore_(kv) {} + ~MetaServiceHandler() { + LOG(INFO) << "~MetaServiceHandler"; + } /** * Parts distribution related operations. * */ diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index e6442636e55..3a01d75fc7e 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -11,18 +11,18 @@ DEFINE_int32(load_data_interval_secs, 2 * 60, "Load data interval"); DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval"); +DEFINE_int32(meta_client_io_thread_num, 2, "meta client io thread number"); namespace nebula { namespace meta { -MetaClient::MetaClient(std::shared_ptr ioThreadPool, - std::vector addrs, +MetaClient::MetaClient(std::vector addrs, HostAddr localHost, bool sendHeartBeat) - : ioThreadPool_(ioThreadPool) - , addrs_(std::move(addrs)) + : addrs_(std::move(addrs)) , localHost_(localHost) , sendHeartBeat_(sendHeartBeat) { + ioThreadPool_ = std::make_unique(FLAGS_meta_client_io_thread_num); CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) << "No meta server address is specified. Meta server is required"; @@ -37,6 +37,8 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool MetaClient::~MetaClient() { bgThread_.stop(); bgThread_.wait(); + + ioThreadPool_->stop(); VLOG(3) << "~MetaClient"; } diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 984e8a0663f..8742901acd3 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -61,8 +61,7 @@ class MetaChangedListener { class MetaClient { public: - explicit MetaClient(std::shared_ptr ioThreadPool, - std::vector addrs, + explicit MetaClient(std::vector addrs, HostAddr localHost = HostAddr(0, 0), bool sendHeartBeat = false); @@ -259,7 +258,7 @@ class MetaClient { const LocalCache& localCache); private: - std::shared_ptr ioThreadPool_; + std::unique_ptr ioThreadPool_{nullptr}; std::shared_ptr> clientsMan_; LocalCache localCache_; diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index c1e0390604b..fc0ef0d9148 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -36,13 +36,11 @@ TEST(MetaClientTest, InterfacesTest) { auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); GraphSpaceID spaceId = 0; - auto threadPool = std::make_shared(1); IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto clientPort = network::NetworkUtils::getAvailablePort(); HostAddr localHost{localIp, clientPort}; - auto client = std::make_shared(threadPool, - std::vector{HostAddr(localIp, sc->port_)}, + auto client = std::make_shared(std::vector{HostAddr(localIp, sc->port_)}, localHost); client->waitForMetadReady(); { @@ -301,11 +299,9 @@ TEST(MetaClientTest, TagTest) { auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); GraphSpaceID spaceId = 0; - auto threadPool = std::make_shared(1); IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); - auto client = std::make_shared(threadPool, - std::vector{HostAddr(localIp, sc->port_)}); + auto client = std::make_shared(std::vector{HostAddr(localIp, sc->port_)}); std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}}; auto r = client->addHosts(hosts).get(); ASSERT_TRUE(r.ok()); @@ -410,12 +406,10 @@ TEST(MetaClientTest, DiffTest) { int32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); - auto threadPool = std::make_shared(1); IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto listener = std::make_unique(); - auto client = std::make_shared(threadPool, - std::vector{HostAddr(localIp, sc->port_)}); + auto client = std::make_shared(std::vector{HostAddr(localIp, sc->port_)}); client->waitForMetadReady(); client->registerListener(listener.get()); { @@ -462,14 +456,12 @@ TEST(MetaClientTest, HeartbeatTest) { fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); auto sc = TestUtils::mockMetaServer(10001, rootPath.path()); - auto threadPool = std::make_shared(1); IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto listener = std::make_unique(); auto clientPort = network::NetworkUtils::getAvailablePort(); HostAddr localHost{localIp, clientPort}; - auto client = std::make_shared(threadPool, - std::vector{HostAddr(localIp, 10001)}, + auto client = std::make_shared(std::vector{HostAddr(localIp, 10001)}, localHost, true); // send heartbeat client->addHosts({localHost}); diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 81fc89e0648..4b02d521be7 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -33,6 +33,7 @@ class TestUtils { public: static std::unique_ptr initKV(const char* rootPath) { auto ioPool = std::make_shared(4); + auto acceptThreadPool = std::make_shared(1); auto partMan = std::make_unique(); // GraphSpaceID => {PartitionIDs} @@ -50,6 +51,7 @@ class TestUtils { auto store = std::make_unique(std::move(options), ioPool, + acceptThreadPool, localhost); store->init(); sleep(1); diff --git a/src/storage/StorageServiceHandler.h b/src/storage/StorageServiceHandler.h index 1469b243a69..58604d7058a 100644 --- a/src/storage/StorageServiceHandler.h +++ b/src/storage/StorageServiceHandler.h @@ -25,6 +25,9 @@ class StorageServiceHandler final : public cpp2::StorageServiceSvIf { : kvstore_(kvstore) , schemaMan_(schemaMan) {} + ~StorageServiceHandler() { + LOG(INFO) << "~StorageServiceHandler"; + } folly::Future future_getOutBound(const cpp2::GetNeighborsRequest& req) override; diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 4ae8ca86186..e47208b3942 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -46,8 +46,7 @@ TEST(StorageClientTest, VerticesInterfacesTest) { uint32_t localDataPort = network::NetworkUtils::getAvailablePort(); auto hostRet = nebula::network::NetworkUtils::toHostAddr("127.0.0.1", localDataPort); auto& localHost = hostRet.value(); - auto mClient - = std::make_unique(threadPool, std::move(addrs), localHost, true); + auto mClient = std::make_unique(std::move(addrs), localHost, true); LOG(INFO) << "Add hosts and create space...."; auto r = mClient->addHosts({HostAddr(localIp, localDataPort)}).get(); ASSERT_TRUE(r.ok()); diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index d8da23a5cad..1a56c01048d 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -35,6 +35,7 @@ class TestUtils { bool useMetaServer = false, std::shared_ptr cfFactory = nullptr) { auto ioPool = std::make_shared(4); + auto acceptThreadPool = std::make_shared(1); kvstore::KVOptions options; if (useMetaServer) { @@ -62,6 +63,7 @@ class TestUtils { options.cfFactory_ = std::move(cfFactory); auto store = std::make_unique(std::move(options), ioPool, + acceptThreadPool, localhost); store->init(); sleep(1);