From da7bea667ac617571893a4031954f2a689747fba Mon Sep 17 00:00:00 2001 From: zhangguoqing Date: Mon, 28 Oct 2019 13:05:04 +0800 Subject: [PATCH] Improve heartbeat between metad and storaged by MetaClient 1. Add the LastUpdateTimeMan to record the latest metadata change(insert, update, delete) time on the MetaServer side. 2. Enrich the information(lastUpdateTime, partitions leaderDist) of `heartbeat` exchange between MetaServer and MetaClient at storage node. 3. It could optimize the load data logic inside meta client by lastUpdateTime, which makes metadata synchronization smarter and more timely. 4. Remove sendHeartBeat_ and load_data_interval_secs from MetaClient class, and add the bool inStoraged_ to indicate whether a metaclient is in storaged. close #1173 close #1060 close #284 --- src/graph/test/ConfigTest.cpp | 2 - src/graph/test/DataTest.cpp | 4 +- src/graph/test/DeleteEdgesTest.cpp | 6 +- src/graph/test/GroupByLimitTest.cpp | 2 - src/graph/test/SchemaTest.cpp | 3 - src/graph/test/TestEnv.cpp | 6 +- src/graph/test/TraverseTestBase.h | 4 +- src/graph/test/UpdateTest.cpp | 2 - src/graph/test/UpdateTestBase.h | 4 +- src/interface/meta.thrift | 7 +- src/kvstore/NebulaStore.h | 8 +- src/kvstore/PartManager.cpp | 9 ++ src/kvstore/PartManager.h | 5 + src/meta/ActiveHostsMan.cpp | 29 ++++++ src/meta/ActiveHostsMan.h | 12 +++ src/meta/MetaServiceUtils.cpp | 27 ++++- src/meta/MetaServiceUtils.h | 9 ++ src/meta/client/MetaClient.cpp | 99 +++++++++---------- src/meta/client/MetaClient.h | 22 ++--- src/meta/processors/BaseProcessor.h | 10 +- src/meta/processors/BaseProcessor.inl | 27 ++++- src/meta/processors/Common.h | 1 + src/meta/processors/admin/BalanceTask.cpp | 3 + src/meta/processors/admin/HBProcessor.cpp | 15 ++- .../processors/customKV/MultiPutProcessor.cpp | 2 +- .../processors/customKV/RemoveProcessor.cpp | 2 +- .../customKV/RemoveRangeProcessor.cpp | 2 +- .../partsMan/ListHostsProcessor.cpp | 11 +-- src/meta/test/ActiveHostsManTest.cpp | 24 ++++- src/meta/test/BalanceIntegrationTest.cpp | 43 +++++--- src/meta/test/ConfigManTest.cpp | 39 ++++---- src/meta/test/HBProcessorTest.cpp | 5 +- src/meta/test/MetaClientTest.cpp | 49 +++++---- src/meta/test/MetaHttpDownloadHandlerTest.cpp | 1 - src/meta/test/MetaHttpStatusHandlerTest.cpp | 1 - src/meta/test/MetaServiceUtilsTest.cpp | 1 - src/storage/UpdateEdgeProcessor.cpp | 4 +- src/storage/UpdateVertexProcessor.cpp | 2 +- src/storage/test/StorageClientTest.cpp | 4 +- .../test/StorageHttpDownloadHandlerTest.cpp | 1 - .../test/StorageHttpStatusHandlerTest.cpp | 1 - .../storage-perf/StorageIntegrityTool.cpp | 8 +- 42 files changed, 333 insertions(+), 183 deletions(-) diff --git a/src/graph/test/ConfigTest.cpp b/src/graph/test/ConfigTest.cpp index 0d93f5d6ceb..2c5f16e59fa 100644 --- a/src/graph/test/ConfigTest.cpp +++ b/src/graph/test/ConfigTest.cpp @@ -12,8 +12,6 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/DataTest.cpp b/src/graph/test/DataTest.cpp index e9aeeafada6..114b8454c64 100644 --- a/src/graph/test/DataTest.cpp +++ b/src/graph/test/DataTest.cpp @@ -9,7 +9,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -168,7 +168,7 @@ AssertionResult DataTest::prepareSchema() { << " failed, error code "<< static_cast(code); } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } diff --git a/src/graph/test/DeleteEdgesTest.cpp b/src/graph/test/DeleteEdgesTest.cpp index eb0db7e6e2f..ad844c28d86 100644 --- a/src/graph/test/DeleteEdgesTest.cpp +++ b/src/graph/test/DeleteEdgesTest.cpp @@ -9,7 +9,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -58,7 +58,7 @@ AssertionResult DeleteEdgesTest::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed, code:" << static_cast(code); } } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string cmd = "USE mySpace"; @@ -99,7 +99,7 @@ AssertionResult DeleteEdgesTest::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed, code:" << static_cast(code); } } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); return TestOK(); } diff --git a/src/graph/test/GroupByLimitTest.cpp b/src/graph/test/GroupByLimitTest.cpp index 7464193a0eb..0fe40c86a30 100644 --- a/src/graph/test/GroupByLimitTest.cpp +++ b/src/graph/test/GroupByLimitTest.cpp @@ -10,8 +10,6 @@ #include "graph/test/TraverseTestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index 8dccca15265..697bc8a4f91 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -10,8 +10,6 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { @@ -714,7 +712,6 @@ TEST_F(SchemaTest, metaCommunication) { ASSERT_EQ(1, (*(resp.get_rows())).size()); } - sleep(FLAGS_load_data_interval_secs + 1); int retry = 60; while (retry-- > 0) { auto spaceResult = gEnv->metaClient()->getSpaceIdByNameFromCache("default_space"); diff --git a/src/graph/test/TestEnv.cpp b/src/graph/test/TestEnv.cpp index 118affc1010..e591bf995b2 100644 --- a/src/graph/test/TestEnv.cpp +++ b/src/graph/test/TestEnv.cpp @@ -9,7 +9,7 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); DECLARE_string(meta_server_addrs); namespace nebula { @@ -27,7 +27,7 @@ TestEnv::~TestEnv() { void TestEnv::SetUp() { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; // Create metaServer metaServer_ = nebula::meta::TestUtils::mockMetaServer( @@ -72,7 +72,7 @@ void TestEnv::SetUp() { void TestEnv::TearDown() { // TO make sure the drop space be invoked on storage server - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); graphServer_.reset(); storageServer_.reset(); mClient_.reset(); diff --git a/src/graph/test/TraverseTestBase.h b/src/graph/test/TraverseTestBase.h index e5a5c13e745..b18ab2b2a8a 100644 --- a/src/graph/test/TraverseTestBase.h +++ b/src/graph/test/TraverseTestBase.h @@ -13,7 +13,7 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -410,7 +410,7 @@ AssertionResult TraverseTestBase::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed"; } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } diff --git a/src/graph/test/UpdateTest.cpp b/src/graph/test/UpdateTest.cpp index d7b102843c6..6e3129be203 100644 --- a/src/graph/test/UpdateTest.cpp +++ b/src/graph/test/UpdateTest.cpp @@ -10,8 +10,6 @@ #include "graph/test/UpdateTestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/UpdateTestBase.h b/src/graph/test/UpdateTestBase.h index 7eefc7aed06..d5e92120a57 100644 --- a/src/graph/test/UpdateTestBase.h +++ b/src/graph/test/UpdateTestBase.h @@ -12,7 +12,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -118,7 +118,7 @@ AssertionResult UpdateTestBase::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed"; } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 40d428ae9f7..c1b525fb1f7 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -359,11 +359,14 @@ struct HBResp { 1: ErrorCode code, 2: common.HostAddr leader, 3: common.ClusterID cluster_id, + 4: i64 last_update_time_in_ms, } struct HBReq { - 1: common.HostAddr host, - 2: common.ClusterID cluster_id, + 1: bool in_storaged, + 2: common.HostAddr host, + 3: common.ClusterID cluster_id, + 4: optional map> (cpp.template = "std::unordered_map") leader_partIds; } struct CreateUserReq { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index f656e79c5ab..d30a24a88f9 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -161,11 +161,10 @@ class NebulaStore : public KVStore, public Handler { ResultCode flush(GraphSpaceID spaceId) override; - int32_t allLeader(std::unordered_map>& leaderIds) override; - bool isLeader(GraphSpaceID spaceId, PartitionID partId); + ErrorOr> space(GraphSpaceID spaceId); + /** * Implement four interfaces in Handler. * */ @@ -177,7 +176,8 @@ class NebulaStore : public KVStore, public Handler { void removePart(GraphSpaceID spaceId, PartitionID partId) override; - ErrorOr> space(GraphSpaceID spaceId); + int32_t allLeader(std::unordered_map>& leaderIds) override; private: void updateSpaceOption(GraphSpaceID spaceId, diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 381ed9a343d..57aff978f83 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -166,5 +166,14 @@ void MetaServerBasedPartManager::onPartUpdated(const PartMeta& partMeta) { UNUSED(partMeta); } +void MetaServerBasedPartManager::fetchLeaderInfo( + std::unordered_map>& leaderIds) { + if (handler_ != nullptr) { + handler_->allLeader(leaderIds); + } else { + VLOG(1) << "handler_ is nullptr!"; + } +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index fd618f45ed1..a53da51ec74 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -24,6 +24,8 @@ class Handler { bool isDbOption) = 0; virtual void removeSpace(GraphSpaceID spaceId) = 0; virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0; + virtual int32_t allLeader(std::unordered_map>& leaderIds) = 0; }; @@ -164,6 +166,9 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL void onPartUpdated(const PartMeta& partMeta) override; + void fetchLeaderInfo(std::unordered_map>& leaderParts) override; + HostAddr getLocalHost() { return localHost_; } diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 7021f6ce42f..a5bd6610266 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -59,5 +59,34 @@ bool ActiveHostsMan::isLived(kvstore::KVStore* kv, const HostAddr& host) { return std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end(); } +kvstore::ResultCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const int64_t timeInMilliSec) { + CHECK_NOTNULL(kv); + std::vector data; + data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(), + MetaServiceUtils::lastUpdateTimeVal(timeInMilliSec)); + + folly::SharedMutex::WriteHolder wHolder(LockUtils::lastUpdateTimeLock()); + folly::Baton baton; + kvstore::ResultCode ret; + kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [&] (kvstore::ResultCode code) { + ret = code; + baton.post(); + }); + baton.wait(); + return ret; +} + +int64_t LastUpdateTimeMan::get(kvstore::KVStore* kv) { + CHECK_NOTNULL(kv); + auto key = MetaServiceUtils::lastUpdateTimeKey(); + std::string val; + auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val); + if (ret == kvstore::ResultCode::SUCCEEDED) { + return *reinterpret_cast(val.data()); + } + return 0; +} + } // namespace meta } // namespace nebula diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index 8f3a06198bd..4bda8ff0b9a 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -59,6 +59,18 @@ class ActiveHostsMan final { ActiveHostsMan() = default; }; +class LastUpdateTimeMan final { +public: + ~LastUpdateTimeMan() = default; + + static kvstore::ResultCode update(kvstore::KVStore* kv, const int64_t timeInMilliSec); + + static int64_t get(kvstore::KVStore* kv); + +protected: + LastUpdateTimeMan() = default; +}; + } // namespace meta } // namespace nebula diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index e1997e2df59..4d52e499ff6 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -21,10 +21,25 @@ const std::string kUsersTable = "__users__"; // NOLINT const std::string kRolesTable = "__roles__"; // NOLINT const std::string kConfigsTable = "__configs__"; // NOLINT const std::string kDefaultTable = "__default__"; // NOLINT +const std::string kLastUpdateTimeTable = "__last_update_time__"; // NOLINT const std::string kHostOnline = "Online"; // NOLINT const std::string kHostOffline = "Offline"; // NOLINT +std::string MetaServiceUtils::lastUpdateTimeKey() { + std::string key; + key.reserve(128); + key.append(kLastUpdateTimeTable.data(), kLastUpdateTimeTable.size()); + return key; +} + +std::string MetaServiceUtils::lastUpdateTimeVal(const int64_t timeInMilliSec) { + std::string val; + val.reserve(sizeof(int64_t)); + val.append(reinterpret_cast(&timeInMilliSec), sizeof(int64_t)); + return val; +} + std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) { std::string key; key.reserve(128); @@ -39,7 +54,7 @@ std::string MetaServiceUtils::spaceVal(const cpp2::SpaceProperties &properties) return val; } -cpp2::SpaceProperties MetaServiceUtils::parseSpace(folly::StringPiece rawData) { +cpp2::SpaceProperties MetaServiceUtils::parseSpace(folly::StringPiece rawData) { cpp2::SpaceProperties properties; apache::thrift::CompactSerializer::deserialize(rawData, properties); return properties; @@ -66,6 +81,16 @@ std::string MetaServiceUtils::partKey(GraphSpaceID spaceId, PartitionID partId) return key; } +GraphSpaceID MetaServiceUtils::parsePartKeySpaceId(folly::StringPiece key) { + return *reinterpret_cast(key.data() + kPartsTable.size()); +} + +PartitionID MetaServiceUtils::parsePartKeyPartId(folly::StringPiece key) { + return *reinterpret_cast(key.data() + + kPartsTable.size() + + sizeof(GraphSpaceID)); +} + std::string MetaServiceUtils::partVal(const std::vector& hosts) { std::string val; val.reserve(128); diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index e49d3e49dd2..f6a62f5efdd 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -10,6 +10,7 @@ #include "base/Base.h" #include "base/Status.h" #include "interface/gen-cpp2/meta_types.h" +#include "meta/ActiveHostsMan.h" namespace nebula { namespace meta { @@ -28,6 +29,10 @@ class MetaServiceUtils final { public: MetaServiceUtils() = delete; + static std::string lastUpdateTimeKey(); + + static std::string lastUpdateTimeVal(const int64_t timeInMilliSec); + static std::string spaceKey(GraphSpaceID spaceId); static std::string spaceVal(const cpp2::SpaceProperties &properties); @@ -42,6 +47,10 @@ class MetaServiceUtils final { static std::string partKey(GraphSpaceID spaceId, PartitionID partId); + static GraphSpaceID parsePartKeySpaceId(folly::StringPiece key); + + static PartitionID parsePartKeyPartId(folly::StringPiece key); + static std::string partVal(const std::vector& hosts); static const std::string& partPrefix(); diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 8fac27cff31..f29a1d11542 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -15,7 +15,6 @@ #include "stats/StatsManager.h" -DEFINE_int32(load_data_interval_secs, 1, "Load data interval"); DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval"); DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry"); DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry"); @@ -29,13 +28,13 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool std::vector addrs, HostAddr localHost, ClusterID clusterId, - bool sendHeartBeat, + bool inStoraged, stats::Stats *stats) : ioThreadPool_(ioThreadPool) , addrs_(std::move(addrs)) , localHost_(localHost) , clusterId_(clusterId) - , sendHeartBeat_(sendHeartBeat) + , inStoraged_(inStoraged) , stats_(stats) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) @@ -57,16 +56,18 @@ MetaClient::~MetaClient() { bool MetaClient::isMetadReady() { - if (sendHeartBeat_) { - auto ret = heartbeat().get(); - if (!ret.ok() && ret.status() != Status::LeaderChanged()) { - LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); - ready_ = false; - return ready_; - } - } // end if - loadData(); - loadCfg(); + auto ret = heartbeat().get(); + if (!ret.ok() && ret.status() != Status::LeaderChanged()) { + LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); + ready_ = false; + return ready_; + } + + bool ldRet = loadData(); + bool lcRet = loadCfg(); + if (ldRet && lcRet) { + localLastUpdateTime_ = metadLastUpdateTime_; + } return ready_; } @@ -88,15 +89,11 @@ bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) { } CHECK(bgThread_->start()); - if (sendHeartBeat_) { - LOG(INFO) << "Register time task for heartbeat!"; - size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addTimerTask(delayMS, - FLAGS_heartbeat_interval_secs * 1000, - &MetaClient::heartBeatThreadFunc, this); - } - addLoadDataTask(); - addLoadCfgTask(); + LOG(INFO) << "Register time task for heartbeat!"; + size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900); + bgThread_->addTimerTask(delayMS, + FLAGS_heartbeat_interval_secs * 1000, + &MetaClient::heartBeatThreadFunc, this); return ready_; } @@ -115,11 +112,15 @@ void MetaClient::heartBeatThreadFunc() { LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); return; } -} -void MetaClient::loadDataThreadFunc() { - loadData(); - addLoadDataTask(); + // if MetaServer has some changes, refesh the localCache_ + if (localLastUpdateTime_ < metadLastUpdateTime_) { + bool ldRet = loadData(); + bool lcRet = loadCfg(); + if (ldRet && lcRet) { + localLastUpdateTime_ = metadLastUpdateTime_; + } + } } bool MetaClient::loadData() { @@ -191,13 +192,6 @@ bool MetaClient::loadData() { return true; } -void MetaClient::addLoadDataTask() { - size_t delayMS = FLAGS_load_data_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addDelayTask(delayMS, &MetaClient::loadDataThreadFunc, this); - LOG(INFO) << "Load data completed, call after " << delayMS << " ms"; -} - - bool MetaClient::loadSchemas(GraphSpaceID spaceId, std::shared_ptr spaceInfoCache, SpaceTagNameIdMap &tagNameIdMap, @@ -1130,7 +1124,6 @@ StatusOr MetaClient::getNewestTagVerFromCache(const GraphSpaceID& spa return it->second; } - StatusOr MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& space, const EdgeType& edgeType) { if (!ready_) { @@ -1144,17 +1137,31 @@ StatusOr MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& sp return it->second; } - folly::Future> MetaClient::heartbeat() { if (clusterId_.load() == 0) { clusterId_ = ClusterIdMan::getClusterIdFromFile(FLAGS_cluster_id_path); } cpp2::HBReq req; + req.set_in_storaged(inStoraged_); nebula::cpp2::HostAddr thriftHost; thriftHost.set_ip(localHost_.first); thriftHost.set_port(localHost_.second); req.set_host(std::move(thriftHost)); req.set_cluster_id(clusterId_.load()); + + if (inStoraged_ && listener_ != nullptr) { + std::unordered_map> leaderIds; + listener_->fetchLeaderInfo(leaderIds); + if (leaderIds_ != leaderIds) { + { + folly::RWSpinLock::WriteHolder holder(leaderIdsLock_); + leaderIds_.clear(); + leaderIds_ = leaderIds; + } + req.set_leader_partIds(std::move(leaderIds)); + } + } + folly::Promise> promise; auto future = promise.getFuture(); VLOG(1) << "Send heartbeat to " << leader_ << ", clusterId " << req.get_cluster_id(); @@ -1171,7 +1178,9 @@ folly::Future> MetaClient::heartbeat() { << FLAGS_cluster_id_path; } } - return true; + metadLastUpdateTime_ = resp.get_last_update_time_in_ms(); + LOG(INFO) << "Metad last update time: " << metadLastUpdateTime_; + return true; // resp.code == cpp2::ErrorCode::SUCCEEDED }, std::move(promise), true); return future; } @@ -1351,11 +1360,6 @@ MetaClient::listConfigs(const cpp2::ConfigModule& module) { return future; } -void MetaClient::loadCfgThreadFunc() { - loadCfg(); - addLoadCfgTask(); -} - bool MetaClient::registerCfg() { auto ret = regConfig(gflagsDeclared_).get(); if (ret.ok()) { @@ -1365,9 +1369,9 @@ bool MetaClient::registerCfg() { return configReady_; } -void MetaClient::loadCfg() { +bool MetaClient::loadCfg() { if (!configReady_ && !registerCfg()) { - return; + return false; } // only load current module's config is enough auto ret = listConfigs(gflagsModule_).get(); @@ -1397,14 +1401,9 @@ void MetaClient::loadCfg() { } } else { LOG(ERROR) << "Load configs failed: " << ret.status(); - return; + return false; } -} - -void MetaClient::addLoadCfgTask() { - size_t delayMS = FLAGS_load_data_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addDelayTask(delayMS, &MetaClient::loadCfgThreadFunc, this); - LOG(INFO) << "Load configs completed, call after " << delayMS << " ms"; + return true; } void MetaClient::updateGflagsValue(const ConfigItem& item) { diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index c8e2988897a..00368df73f8 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -92,6 +92,8 @@ class MetaChangedListener { virtual void onPartAdded(const PartMeta& partMeta) = 0; virtual void onPartRemoved(GraphSpaceID spaceId, PartitionID partId) = 0; virtual void onPartUpdated(const PartMeta& partMeta) = 0; + virtual void fetchLeaderInfo(std::unordered_map>& leaderIds) = 0; }; class MetaClient { @@ -109,10 +111,9 @@ class MetaClient { std::vector addrs, HostAddr localHost = HostAddr(0, 0), ClusterID clusterId = 0, - bool sendHeartBeat = false, + bool inStoraged = true, stats::Stats *stats = nullptr); - virtual ~MetaClient(); bool isMetadReady(); @@ -288,18 +289,12 @@ class MetaClient { Status refreshCache(); protected: - void loadDataThreadFunc(); // Return true if load succeeded. bool loadData(); - - void addLoadDataTask(); - + bool loadCfg(); void heartBeatThreadFunc(); - void loadCfgThreadFunc(); - void loadCfg(); bool registerCfg(); - void addLoadCfgTask(); void updateGflagsValue(const ConfigItem& item); void updateNestedGflags(const std::string& name); @@ -362,6 +357,11 @@ class MetaClient { std::shared_ptr ioThreadPool_; std::shared_ptr> clientsMan_; + std::unordered_map> leaderIds_; + folly::RWSpinLock leaderIdsLock_; + int64_t localLastUpdateTime_{0}; + int64_t metadLastUpdateTime_{0}; + LocalCache localCache_; std::vector addrs_; // The lock used to protect active_ and leader_. @@ -377,13 +377,13 @@ class MetaClient { SpaceEdgeTypeNameMap spaceEdgeIndexByType_; SpaceNewestTagVerMap spaceNewestTagVerMap_; SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_; - SpaceAllEdgeMap spaceAllEdgeMap_; + SpaceAllEdgeMap spaceAllEdgeMap_; folly::RWSpinLock localCacheLock_; MetaChangedListener* listener_{nullptr}; folly::RWSpinLock listenerLock_; std::atomic clusterId_{0}; bool isRunning_{false}; - bool sendHeartBeat_{false}; + bool inStoraged_{true}; std::atomic_bool ready_{false}; MetaConfigMap metaConfigMap_; folly::RWSpinLock configCacheLock_; diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index b404655b305..2def9e1b9cd 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -18,6 +18,7 @@ #include "meta/common/MetaCommon.h" #include "network/NetworkUtils.h" #include "meta/processors/Common.h" +#include "meta/ActiveHostsMan.h" #include "stats/Stats.h" namespace nebula { @@ -132,7 +133,7 @@ class BaseProcessor { /** * General put function. * */ - void doPut(std::vector data); + void doPut(std::vector data, bool refresh = true); StatusOr> doPrefix(const std::string& key); @@ -149,13 +150,14 @@ class BaseProcessor { /** * General remove function. * */ - void doRemove(const std::string& key); + void doRemove(const std::string& key, bool refresh = true); /** * Remove keys from start to end, doesn't contain end. * */ void doRemoveRange(const std::string& start, - const std::string& end); + const std::string& end, + bool refresh = true); /** * Scan keys from start to end, doesn't contain end. @@ -165,7 +167,7 @@ class BaseProcessor { /** * General multi remove function. **/ - void doMultiRemove(std::vector keys); + void doMultiRemove(std::vector keys, bool refresh = true); /** * Get all hosts diff --git a/src/meta/processors/BaseProcessor.inl b/src/meta/processors/BaseProcessor.inl index e8cec4e6882..5fa09acf677 100644 --- a/src/meta/processors/BaseProcessor.inl +++ b/src/meta/processors/BaseProcessor.inl @@ -11,7 +11,13 @@ namespace nebula { namespace meta { template -void BaseProcessor::doPut(std::vector data) { +void BaseProcessor::doPut(std::vector data, bool refresh) { + if (refresh) { + int64_t now = time::WallClock::fastNowInMilliSec(); + data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(), + MetaServiceUtils::lastUpdateTimeVal(now)); + } + kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), @@ -62,7 +68,11 @@ BaseProcessor::doMultiGet(const std::vector& keys) { template -void BaseProcessor::doRemove(const std::string& key) { +void BaseProcessor::doRemove(const std::string& key, bool refresh) { + if (refresh) { + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); + } + kvstore_->asyncRemove(kDefaultSpaceId, kDefaultPartId, key, @@ -74,7 +84,11 @@ void BaseProcessor::doRemove(const std::string& key) { template -void BaseProcessor::doMultiRemove(std::vector keys) { +void BaseProcessor::doMultiRemove(std::vector keys, bool refresh) { + if (refresh) { + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); + } + kvstore_->asyncMultiRemove(kDefaultSpaceId, kDefaultPartId, std::move(keys), @@ -87,7 +101,12 @@ void BaseProcessor::doMultiRemove(std::vector keys) { template void BaseProcessor::doRemoveRange(const std::string& start, - const std::string& end) { + const std::string& end, + bool refresh) { + if (refresh) { + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); + } + kvstore_->asyncRemoveRange(kDefaultSpaceId, kDefaultPartId, start, diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h index 4018252cd58..43a645d47dd 100644 --- a/src/meta/processors/Common.h +++ b/src/meta/processors/Common.h @@ -26,6 +26,7 @@ class LockUtils { return l; \ } +GENERATE_LOCK(lastUpdateTime); GENERATE_LOCK(space); GENERATE_LOCK(id); GENERATE_LOCK(tag); diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp index 8b2d2572fa2..f65d87f18d8 100644 --- a/src/meta/processors/admin/BalanceTask.cpp +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -149,6 +149,9 @@ void BalanceTask::invoke() { ret_ = Result::FAILED; } else { status_ = Status::REMOVE_PART_ON_SRC; + if (kv_ != nullptr) { + LastUpdateTimeMan::update(kv_, time::WallClock::fastNowInMilliSec()); + } } invoke(); }); diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index a4db3ffebb8..716a1c7c942 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -15,7 +15,6 @@ DEFINE_bool(hosts_whitelist_enabled, false, "Check host whether in whitelist whe namespace nebula { namespace meta { - void HBProcessor::process(const cpp2::HBReq& req) { HostAddr host(req.host.ip, req.host.port); if (FLAGS_hosts_whitelist_enabled @@ -39,8 +38,16 @@ void HBProcessor::process(const cpp2::HBReq& req) { } LOG(INFO) << "Receive heartbeat from " << host; - HostInfo info(time::WallClock::fastNowInMilliSec()); - auto ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info); + auto ret = kvstore::ResultCode::SUCCEEDED; + if (req.get_in_storaged()) { + HostInfo info(time::WallClock::fastNowInMilliSec()); + ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info); + if (req.__isset.leader_partIds) { + const auto& leaderIds = req.get_leader_partIds(); + UNUSED(leaderIds); + // TODO(zhangguoqing) put the leader parts information into kvstore + } + } resp_.set_code(to(ret)); if (ret == kvstore::ResultCode::ERR_LEADER_CHANGED) { auto leaderRet = kvstore_->partLeader(kDefaultSpaceId, kDefaultPartId); @@ -48,6 +55,8 @@ void HBProcessor::process(const cpp2::HBReq& req) { resp_.set_leader(toThriftHost(nebula::value(leaderRet))); } } + int64_t lastUpdateTime = LastUpdateTimeMan::get(this->kvstore_); + resp_.set_last_update_time_in_ms(lastUpdateTime); onFinished(); } diff --git a/src/meta/processors/customKV/MultiPutProcessor.cpp b/src/meta/processors/customKV/MultiPutProcessor.cpp index 6ab93ac6bc7..0bcedda101b 100644 --- a/src/meta/processors/customKV/MultiPutProcessor.cpp +++ b/src/meta/processors/customKV/MultiPutProcessor.cpp @@ -16,7 +16,7 @@ void MultiPutProcessor::process(const cpp2::MultiPutReq& req) { data.emplace_back(MetaServiceUtils::assembleSegmentKey(req.get_segment(), pair.get_key()), pair.get_value()); } - doPut(std::move(data)); + doPut(std::move(data), false); } } // namespace meta diff --git a/src/meta/processors/customKV/RemoveProcessor.cpp b/src/meta/processors/customKV/RemoveProcessor.cpp index a19e6540efa..7e157ee23ab 100644 --- a/src/meta/processors/customKV/RemoveProcessor.cpp +++ b/src/meta/processors/customKV/RemoveProcessor.cpp @@ -12,7 +12,7 @@ namespace meta { void RemoveProcessor::process(const cpp2::RemoveReq& req) { CHECK_SEGMENT(req.get_segment()); auto key = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_key()); - doRemove(key); + doRemove(key, false); } } // namespace meta diff --git a/src/meta/processors/customKV/RemoveRangeProcessor.cpp b/src/meta/processors/customKV/RemoveRangeProcessor.cpp index 905fcd916e1..46346d54937 100644 --- a/src/meta/processors/customKV/RemoveRangeProcessor.cpp +++ b/src/meta/processors/customKV/RemoveRangeProcessor.cpp @@ -13,7 +13,7 @@ void RemoveRangeProcessor::process(const cpp2::RemoveRangeReq& req) { CHECK_SEGMENT(req.get_segment()); auto start = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_start()); auto end = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_end()); - doRemoveRange(start, end); + doRemoveRange(start, end, false); } } // namespace meta diff --git a/src/meta/processors/partsMan/ListHostsProcessor.cpp b/src/meta/processors/partsMan/ListHostsProcessor.cpp index 0604ae373f9..dc41b7e9746 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.cpp +++ b/src/meta/processors/partsMan/ListHostsProcessor.cpp @@ -50,10 +50,8 @@ StatusOr> ListHostsProcessor::allHostsWithStatus( std::vector removeHostsKey; while (iter->valid()) { cpp2::HostItem item; - nebula::cpp2::HostAddr host; - auto hostAddrPiece = iter->key().subpiece(hostPrefix.size()); - memcpy(&host, hostAddrPiece.data(), hostAddrPiece.size()); - item.set_hostAddr(host); + auto host = MetaServiceUtils::parseHostKey(iter->key()); + item.set_hostAddr(std::move(host)); HostInfo info = HostInfo::decode(iter->val()); if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) { if (now - info.lastHBTimeInMilliSec_ < FLAGS_expired_threshold_sec * 1000) { @@ -97,9 +95,7 @@ StatusOr> ListHostsProcessor::allHostsWithStatus( return Status::Error("Cant't find any partitions"); } while (iter->valid()) { - auto key = iter->key(); - PartitionID partId; - memcpy(&partId, key.data() + partPrefix.size(), sizeof(PartitionID)); + PartitionID partId = MetaServiceUtils::parsePartKeyPartId(iter->key()); auto partHosts = MetaServiceUtils::parsePartVal(iter->val()); for (auto& host : partHosts) { hostParts[HostAddr(host.ip, host.port)].emplace_back(partId); @@ -176,7 +172,6 @@ void ListHostsProcessor::getLeaderDist( } } - } // namespace meta } // namespace nebula diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp index db7203d7522..7365258c94e 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -52,6 +52,29 @@ TEST(ActiveHostsManTest, NormalTest) { ASSERT_EQ(1, ActiveHostsMan::getActiveHosts(kv.get()).size()); } +TEST(LastUpdateTimeManTest, NormalTest) { + fs::TempDir rootPath("/tmp/LastUpdateTimeManTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + + ASSERT_EQ(0, LastUpdateTimeMan::get(kv.get())); + int64_t now = time::WallClock::fastNowInMilliSec(); + + LastUpdateTimeMan::update(kv.get(), now); + ASSERT_EQ(now, LastUpdateTimeMan::get(kv.get())); + LastUpdateTimeMan::update(kv.get(), now + 100); + ASSERT_EQ(now + 100, LastUpdateTimeMan::get(kv.get())); + + LastUpdateTimeMan::update(kv.get(), now - 100); + { + auto key = MetaServiceUtils::lastUpdateTimeKey(); + std::string val; + auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val); + ASSERT_EQ(ret, kvstore::ResultCode::SUCCEEDED); + int64_t lastUpdateTime = *reinterpret_cast(val.data()); + ASSERT_EQ(now - 100, lastUpdateTime); + } +} + } // namespace meta } // namespace nebula @@ -63,4 +86,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp index 14c36a1c7cf..e1e250ee8a0 100644 --- a/src/meta/test/BalanceIntegrationTest.cpp +++ b/src/meta/test/BalanceIntegrationTest.cpp @@ -15,7 +15,6 @@ #include "storage/test/TestUtils.h" #include "dataman/RowWriter.h" -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); DECLARE_uint32(raft_heartbeat_interval_secs); DECLARE_int32(expired_threshold_sec); @@ -24,7 +23,6 @@ namespace nebula { namespace meta { TEST(BalanceIntegrationTest, BalanceTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; FLAGS_raft_heartbeat_interval_secs = 1; FLAGS_expired_threshold_sec = 3; @@ -49,8 +47,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { LOG(INFO) << "Create meta client..."; uint32_t tempDataPort = network::NetworkUtils::getAvailablePort(); HostAddr tempDataAddr(localIp, tempDataPort); - auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, - kClusterId, false); + auto mClient = std::make_unique(threadPool, + metaAddr, + tempDataAddr, + kClusterId, + false); mClient->waitForMetadReady(); @@ -69,8 +70,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { VLOG(1) << "The storage server has been added to the meta service"; - auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto metaClient = std::make_shared(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); metaClient->waitForMetadReady(); metaClients.emplace_back(metaClient); } @@ -102,7 +106,7 @@ TEST(BalanceIntegrationTest, BalanceTest) { auto tagRet = mClient->createTagSchema(spaceId, "tag", std::move(schema)).get(); ASSERT_TRUE(tagRet.ok()); auto tagId = tagRet.value(); - sleep(FLAGS_load_data_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); LOG(INFO) << "Let's write some data"; auto sClient = std::make_unique(threadPool, mClient.get()); @@ -173,8 +177,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { uint32_t storagePort = network::NetworkUtils::getAvailablePort(); HostAddr storageAddr(localIp, storagePort); { - newMetaClient = std::make_unique(threadPool, metaAddr, storageAddr, - kClusterId, true); + newMetaClient = std::make_unique(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); newMetaClient->waitForMetadReady(); std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), replica + 1); newServer = storage::TestUtils::mockStorageServer(newMetaClient.get(), @@ -246,7 +253,6 @@ TEST(BalanceIntegrationTest, BalanceTest) { } TEST(BalanceIntegrationTest, LeaderBalanceTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; FLAGS_raft_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/balance_integration_test.XXXXXX"); @@ -270,8 +276,11 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { LOG(INFO) << "Create meta client..."; uint32_t tempDataPort = network::NetworkUtils::getAvailablePort(); HostAddr tempDataAddr(localIp, tempDataPort); - auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, - kClusterId, false); + auto mClient = std::make_unique(threadPool, + metaAddr, + tempDataAddr, + kClusterId, + false); mClient->waitForMetadReady(); @@ -290,8 +299,11 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { VLOG(1) << "The storage server has been added to the meta service"; - auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto metaClient = std::make_shared(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); metaClient->waitForMetadReady(); metaClients.emplace_back(metaClient); } @@ -308,7 +320,7 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { auto ret = mClient->createSpace("storage", partition, replica).get(); ASSERT_TRUE(ret.ok()); - sleep(FLAGS_load_data_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); auto code = balancer.leaderBalance(); ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); @@ -336,4 +348,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/meta/test/ConfigManTest.cpp b/src/meta/test/ConfigManTest.cpp index 79e4ce18646..cb08ddce118 100644 --- a/src/meta/test/ConfigManTest.cpp +++ b/src/meta/test/ConfigManTest.cpp @@ -21,7 +21,7 @@ #include "storage/test/TestUtils.h" #include "rocksdb/utilities/options_util.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); DECLARE_string(rocksdb_db_options); DECLARE_string(rocksdb_column_family_options); @@ -265,7 +265,7 @@ ConfigItem toConfigItem(const cpp2::ConfigItem& item) { } TEST(ConfigManTest, MetaConfigManTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaConfigManTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -313,7 +313,7 @@ TEST(ConfigManTest, MetaConfigManTest) { std::string name = "not_existed"; auto type = cpp2::ConfigType::INT64; - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); // get/set without register auto setRet = cfgMan.setConfig(module, name, type, 101l).get(); ASSERT_FALSE(setRet.ok()); @@ -336,7 +336,7 @@ TEST(ConfigManTest, MetaConfigManTest) { auto value = boost::get(item.value_); ASSERT_EQ(value, 100); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_int64_key_immutable, 100); } // mutable config @@ -357,7 +357,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, 102); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_int64_key, 102); } { @@ -377,7 +377,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, true); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_bool_key, true); } { @@ -397,7 +397,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, 3.14); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_double_key, 3.14); } { @@ -418,7 +418,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, "abc"); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_string_key, "abc"); } { @@ -446,7 +446,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(val, "8"); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); confRet = conf.parseFromString(FLAGS_nested_key); ASSERT_TRUE(confRet.ok()); status = conf.fetchAsString("max_background_compactions", val); @@ -461,7 +461,7 @@ TEST(ConfigManTest, MetaConfigManTest) { } TEST(ConfigManTest, MockConfigTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MockConfigTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -511,7 +511,7 @@ TEST(ConfigManTest, MockConfigTest) { } // check values in ClientBaseGflagsManager - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_test0, "updated0"); ASSERT_EQ(FLAGS_test1, "updated1"); ASSERT_EQ(FLAGS_test2, "updated2"); @@ -520,7 +520,7 @@ TEST(ConfigManTest, MockConfigTest) { } TEST(ConfigManTest, RocksdbOptionsTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/RocksdbOptionsTest.XXXXXX"); IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -542,8 +542,11 @@ TEST(ConfigManTest, RocksdbOptionsTest) { LOG(INFO) << "Create meta client..."; uint32_t storagePort = network::NetworkUtils::getAvailablePort(); HostAddr storageAddr(localIp, storagePort); - auto mClient = std::make_unique(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto mClient = std::make_unique(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); mClient->waitForMetadReady(); mClient->gflagsModule_ = module; @@ -576,7 +579,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto ret = mClient->createSpace("storage", 9, 1).get(); ASSERT_TRUE(ret.ok()); auto spaceId = ret.value(); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); storage::TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 9); { std::string name = "rocksdb_db_options"; @@ -591,7 +594,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); ASSERT_EQ(FLAGS_rocksdb_db_options, value); } { @@ -608,12 +611,12 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); ASSERT_EQ(FLAGS_rocksdb_column_family_options, value); } { // need to sleep a bit to take effect on rocksdb - sleep(3); + sleep(FLAGS_heartbeat_interval_secs + 2); rocksdb::DBOptions loadedDbOpt; std::vector loadedCfDescs; std::string rocksPath = folly::stringPrintf("%s/disk1/nebula/%d/data", diff --git a/src/meta/test/HBProcessorTest.cpp b/src/meta/test/HBProcessorTest.cpp index 6eda99c2e0b..e654d99a707 100644 --- a/src/meta/test/HBProcessorTest.cpp +++ b/src/meta/test/HBProcessorTest.cpp @@ -17,9 +17,6 @@ DECLARE_bool(hosts_whitelist_enabled); namespace nebula { namespace meta { -using nebula::cpp2::SupportedType; -using apache::thrift::FragileConstructor::FRAGILE; - TEST(HBProcessorTest, HBTest) { fs::TempDir rootPath("/tmp/HBTest.XXXXXX"); std::unique_ptr kv(TestUtils::initKV(rootPath.path())); @@ -27,6 +24,7 @@ TEST(HBProcessorTest, HBTest) { { for (auto i = 0; i < 5; i++) { cpp2::HBReq req; + req.set_in_storaged(true); nebula::cpp2::HostAddr thriftHost; thriftHost.set_ip(i); thriftHost.set_port(i); @@ -69,4 +67,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 23acbfe26c6..b4a636c4809 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -17,7 +17,6 @@ #include "meta/test/TestUtils.h" #include "meta/ClientBasedGflagsManager.h" -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); DECLARE_string(rocksdb_db_options); @@ -31,7 +30,7 @@ using nebula::cpp2::ValueType; using apache::thrift::FragileConstructor::FRAGILE; TEST(MetaClientTest, InterfacesTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); // Let the system choose an available port for us @@ -46,7 +45,9 @@ TEST(MetaClientTest, InterfacesTest) { HostAddr localHost{localIp, clientPort}; auto client = std::make_shared(threadPool, std::vector{HostAddr(localIp, sc->port_)}, - localHost); + localHost, + 0, + false); client->waitForMetadReady(); { // Add hosts automatically, then testing listHosts interface. @@ -148,7 +149,7 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_EQ(ret1.value().begin()->schema.columns.size(), 5); // getTagSchemaFromCache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); auto ret = client->getNewestTagVerFromCache(spaceId, ret1.value().begin()->tag_id); CHECK(ret.ok()); @@ -221,7 +222,7 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_STREQ("edgeItem0", outSchema1->getFieldName(0)); } } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); { // Test cache interfaces auto partsMap = client->getPartsMapFromCache(HostAddr(0, 0)); @@ -320,7 +321,7 @@ TEST(MetaClientTest, InterfacesTest) { } TEST(MetaClientTest, TagTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTagTest.XXXXXX"); // Let the system choose an available port for us @@ -332,7 +333,7 @@ TEST(MetaClientTest, TagTest) { IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto localhosts = std::vector{HostAddr(localIp, sc->port_)}; - auto client = std::make_shared(threadPool, localhosts); + auto client = std::make_shared(threadPool, localhosts, HostAddr(0, 0), 0, false); std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}}; client->waitForMetadReady(); TestUtils::registerHB(sc->kvStore_.get(), hosts); @@ -430,7 +431,7 @@ TEST(MetaClientTest, TagTest) { } TEST(MetaClientTest, EdgeTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientEdgeTest.XXXXXX"); // Let the system choose an available port for us @@ -547,6 +548,12 @@ class TestListener : public MetaChangedListener { partChanged++; } + void fetchLeaderInfo(std::unordered_map>& leaderIds) override { + LOG(INFO) << "Get leader distribution!"; + UNUSED(leaderIds); + } + HostAddr getLocalHost() { return HostAddr(0, 0); } @@ -558,7 +565,7 @@ class TestListener : public MetaChangedListener { }; TEST(MetaClientTest, DiffTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); // Let the system choose an available port for us @@ -570,7 +577,10 @@ TEST(MetaClientTest, DiffTest) { network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto listener = std::make_unique(); auto client = std::make_shared(threadPool, - std::vector{HostAddr(localIp, sc->port_)}); + std::vector{HostAddr(localIp, sc->port_)}, + HostAddr(0, 0), + 0, + false); client->waitForMetadReady(); client->registerListener(listener.get()); { @@ -590,14 +600,14 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->createSpace("default_space", 9, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(1, listener->spaceNum); ASSERT_EQ(9, listener->partNum); { auto ret = client->createSpace("default_space_1", 5, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(2, listener->spaceNum); ASSERT_EQ(14, listener->partNum); { @@ -605,13 +615,12 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->dropSpace("default_space_1").get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(1, listener->spaceNum); ASSERT_EQ(9, listener->partNum); } TEST(MetaClientTest, HeartbeatTest) { - FLAGS_load_data_interval_secs = 5; FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); @@ -628,7 +637,7 @@ TEST(MetaClientTest, HeartbeatTest) { std::vector{HostAddr(localIp, 10001)}, localHost, kClusterId, - true); // send heartbeat + true); client->waitForMetadReady(); client->registerListener(listener.get()); { @@ -695,6 +704,7 @@ class TestMetaServiceRetry : public cpp2::MetaServiceSvIf { }; TEST(MetaClientTest, SimpleTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -720,6 +730,7 @@ TEST(MetaClientTest, SimpleTest) { } TEST(MetaClientTest, RetryWithExceptionTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -742,6 +753,7 @@ TEST(MetaClientTest, RetryWithExceptionTest) { } TEST(MetaClientTest, RetryOnceTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -781,6 +793,7 @@ TEST(MetaClientTest, RetryOnceTest) { } TEST(MetaClientTest, RetryUntilLimitTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -819,7 +832,7 @@ TEST(MetaClientTest, RetryUntilLimitTest) { } TEST(MetaClientTest, RocksdbOptionsTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/RocksdbOptionsTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -855,7 +868,7 @@ TEST(MetaClientTest, RocksdbOptionsTest) { std::vector hosts = {{0, 0}}; TestUtils::registerHB(sc->kvStore_.get(), hosts); client->createSpace("default_space", 9, 1).get(); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); } { std::string name = "rocksdb_db_options"; @@ -871,7 +884,7 @@ TEST(MetaClientTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_rocksdb_db_options, value); ASSERT_EQ(listener->options["disable_auto_compactions"], "true"); ASSERT_EQ(listener->options["level0_file_num_compaction_trigger"], "4"); diff --git a/src/meta/test/MetaHttpDownloadHandlerTest.cpp b/src/meta/test/MetaHttpDownloadHandlerTest.cpp index c6cfa5f0a9b..2054525cd7e 100644 --- a/src/meta/test/MetaHttpDownloadHandlerTest.cpp +++ b/src/meta/test/MetaHttpDownloadHandlerTest.cpp @@ -14,7 +14,6 @@ #include "storage/StorageHttpDownloadHandler.h" #include "fs/TempDir.h" -DECLARE_int32(load_data_interval_secs); DECLARE_string(pid_file); DECLARE_int32(ws_storage_http_port); diff --git a/src/meta/test/MetaHttpStatusHandlerTest.cpp b/src/meta/test/MetaHttpStatusHandlerTest.cpp index 830ed25c3d3..6e3257187d5 100644 --- a/src/meta/test/MetaHttpStatusHandlerTest.cpp +++ b/src/meta/test/MetaHttpStatusHandlerTest.cpp @@ -13,7 +13,6 @@ #include "meta/test/TestUtils.h" #include "fs/TempDir.h" -DECLARE_int32(load_data_interval_secs); DECLARE_string(pid_file); namespace nebula { diff --git a/src/meta/test/MetaServiceUtilsTest.cpp b/src/meta/test/MetaServiceUtilsTest.cpp index 4ddb02a3d9c..78befca356b 100644 --- a/src/meta/test/MetaServiceUtilsTest.cpp +++ b/src/meta/test/MetaServiceUtilsTest.cpp @@ -112,4 +112,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/storage/UpdateEdgeProcessor.cpp b/src/storage/UpdateEdgeProcessor.cpp index c3b3217f2f2..6b4b4b5519b 100644 --- a/src/storage/UpdateEdgeProcessor.cpp +++ b/src/storage/UpdateEdgeProcessor.cpp @@ -339,7 +339,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { this->spaceId_ = req.get_space_id(); insertable_ = req.get_insertable(); auto partId = req.get_part_id(); - auto edgeKey = std::move(req).get_edge_key(); + auto edgeKey = req.get_edge_key(); std::vector eTypes; eTypes.emplace_back(edgeKey.get_edge_type()); this->initEdgeContext(eTypes); @@ -350,7 +350,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { this->onFinished(); return; } - updateItems_ = std::move(req).get_update_items(); + updateItems_ = req.get_update_items(); VLOG(3) << "Update edge, spaceId: " << this->spaceId_ << ", partId: " << partId << ", src: " << edgeKey.get_src() << ", edge_type: " << edgeKey.get_edge_type() diff --git a/src/storage/UpdateVertexProcessor.cpp b/src/storage/UpdateVertexProcessor.cpp index 5ea5663d6ba..baed78d1bd2 100644 --- a/src/storage/UpdateVertexProcessor.cpp +++ b/src/storage/UpdateVertexProcessor.cpp @@ -328,7 +328,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { return; } auto vId = req.get_vertex_id(); - updateItems_ = std::move(req).get_update_items(); + updateItems_ = req.get_update_items(); VLOG(3) << "Update vertex, spaceId: " << this->spaceId_ << ", partId: " << partId << ", vId: " << vId; diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 4b8cc7e9b79..68b34e7da11 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -16,14 +16,12 @@ #include "network/NetworkUtils.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace storage { TEST(StorageClientTest, VerticesInterfacesTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; fs::TempDir rootPath("/tmp/StorageClientTest.XXXXXX"); @@ -75,7 +73,7 @@ TEST(StorageClientTest, VerticesInterfacesTest) { ASSERT_TRUE(ret.ok()) << ret.status(); spaceId = ret.value(); LOG(INFO) << "Created space \"default\", its id is " << spaceId; - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 10); auto client = std::make_unique(threadPool, mClient.get()); diff --git a/src/storage/test/StorageHttpDownloadHandlerTest.cpp b/src/storage/test/StorageHttpDownloadHandlerTest.cpp index 7cd254321e5..873ca796c5d 100644 --- a/src/storage/test/StorageHttpDownloadHandlerTest.cpp +++ b/src/storage/test/StorageHttpDownloadHandlerTest.cpp @@ -14,7 +14,6 @@ #include "fs/TempDir.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); namespace nebula { namespace storage { diff --git a/src/storage/test/StorageHttpStatusHandlerTest.cpp b/src/storage/test/StorageHttpStatusHandlerTest.cpp index dfabf6061db..abaa3a04101 100644 --- a/src/storage/test/StorageHttpStatusHandlerTest.cpp +++ b/src/storage/test/StorageHttpStatusHandlerTest.cpp @@ -15,7 +15,6 @@ #include "http/HttpClient.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); namespace nebula { namespace storage { diff --git a/src/tools/storage-perf/StorageIntegrityTool.cpp b/src/tools/storage-perf/StorageIntegrityTool.cpp index 7608d5b5614..6a3e658fafe 100644 --- a/src/tools/storage-perf/StorageIntegrityTool.cpp +++ b/src/tools/storage-perf/StorageIntegrityTool.cpp @@ -22,7 +22,7 @@ DEFINE_int64(first_vertex_id, 1, "The smallest vertex id"); DEFINE_uint64(width, 100, "width of matrix"); DEFINE_uint64(height, 1000, "height of matrix"); -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace storage { @@ -38,7 +38,7 @@ namespace storage { * update it after we suppport preheat. The tag must have only one int property, * which is prop_name. * 2. If the space and tag doesn't exists, it will try to create one, maybe you need to set - * load_data_interval_secs to make sure the storage service has load meta. + * heartbeat_interval_secs to make sure the storage service has load meta. * 3. The width and height is the size of the big linked list, you can refer to the graph below. * As expected, we can traverse the big linked list after width * height steps starting * from any node in the list. @@ -66,7 +66,7 @@ class IntegrityTest { private: bool init() { - FLAGS_load_data_interval_secs = 10; + FLAGS_heartbeat_interval_secs = 10; auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) { LOG(ERROR) << "Can't get metaServer address, status: " << metaAddrsRet.status() @@ -95,7 +95,7 @@ class IntegrityTest { auto tagResult = mClient_->getTagIDByNameFromCache(spaceId_, FLAGS_tag_name); if (!tagResult.ok()) { - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); LOG(ERROR) << "Get tagId failed, try to create one: " << tagResult.status(); nebula::cpp2::Schema schema; nebula::cpp2::ColumnDef column;