diff --git a/src/graph/test/ConfigTest.cpp b/src/graph/test/ConfigTest.cpp index 532134223a2..c3d76acd31f 100644 --- a/src/graph/test/ConfigTest.cpp +++ b/src/graph/test/ConfigTest.cpp @@ -12,8 +12,6 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/DataTest.cpp b/src/graph/test/DataTest.cpp index 5ecbce2fdd3..049de1f1590 100644 --- a/src/graph/test/DataTest.cpp +++ b/src/graph/test/DataTest.cpp @@ -9,7 +9,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -169,7 +169,7 @@ AssertionResult DataTest::prepareSchema() { << " failed, error code "<< static_cast(code); } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } @@ -1023,7 +1023,7 @@ class FetchEmptyPropsTest : public ::testing::Test { execute(client_.get(), q); } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); } void prepareData() { diff --git a/src/graph/test/DeleteEdgesTest.cpp b/src/graph/test/DeleteEdgesTest.cpp index eb0db7e6e2f..ad844c28d86 100644 --- a/src/graph/test/DeleteEdgesTest.cpp +++ b/src/graph/test/DeleteEdgesTest.cpp @@ -9,7 +9,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -58,7 +58,7 @@ AssertionResult DeleteEdgesTest::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed, code:" << static_cast(code); } } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); { cpp2::ExecutionResponse resp; std::string cmd = "USE mySpace"; @@ -99,7 +99,7 @@ AssertionResult DeleteEdgesTest::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed, code:" << static_cast(code); } } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); return TestOK(); } diff --git a/src/graph/test/GroupByLimitTest.cpp b/src/graph/test/GroupByLimitTest.cpp index 7dbdc52da76..157441b2a29 100644 --- a/src/graph/test/GroupByLimitTest.cpp +++ b/src/graph/test/GroupByLimitTest.cpp @@ -10,8 +10,6 @@ #include "graph/test/TraverseTestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index 466e5b3a344..90a5688aa8b 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -10,8 +10,6 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { @@ -762,7 +760,6 @@ TEST_F(SchemaTest, metaCommunication) { ASSERT_EQ(1, (*(resp.get_rows())).size()); } - sleep(FLAGS_load_data_interval_secs + 1); int retry = 60; while (retry-- > 0) { auto spaceResult = gEnv->metaClient()->getSpaceIdByNameFromCache("default_space"); diff --git a/src/graph/test/SnapshotCommandTest.cpp b/src/graph/test/SnapshotCommandTest.cpp index 7929150152d..cea4619c144 100644 --- a/src/graph/test/SnapshotCommandTest.cpp +++ b/src/graph/test/SnapshotCommandTest.cpp @@ -9,7 +9,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -37,7 +37,7 @@ TEST_F(SnapshotCommandTest, TestSnapshot) { auto code = client->execute(cmd, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_NE(nullptr, client); { diff --git a/src/graph/test/TestEnv.cpp b/src/graph/test/TestEnv.cpp index 118affc1010..e591bf995b2 100644 --- a/src/graph/test/TestEnv.cpp +++ b/src/graph/test/TestEnv.cpp @@ -9,7 +9,7 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); DECLARE_string(meta_server_addrs); namespace nebula { @@ -27,7 +27,7 @@ TestEnv::~TestEnv() { void TestEnv::SetUp() { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; // Create metaServer metaServer_ = nebula::meta::TestUtils::mockMetaServer( @@ -72,7 +72,7 @@ void TestEnv::SetUp() { void TestEnv::TearDown() { // TO make sure the drop space be invoked on storage server - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); graphServer_.reset(); storageServer_.reset(); mClient_.reset(); diff --git a/src/graph/test/TraverseTestBase.h b/src/graph/test/TraverseTestBase.h index 8cc2285ac52..4125e1dbb58 100644 --- a/src/graph/test/TraverseTestBase.h +++ b/src/graph/test/TraverseTestBase.h @@ -13,7 +13,7 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -433,7 +433,7 @@ AssertionResult TraverseTestBase::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed"; } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } diff --git a/src/graph/test/UpdateTest.cpp b/src/graph/test/UpdateTest.cpp index d7b102843c6..6e3129be203 100644 --- a/src/graph/test/UpdateTest.cpp +++ b/src/graph/test/UpdateTest.cpp @@ -10,8 +10,6 @@ #include "graph/test/UpdateTestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/UpdateTestBase.h b/src/graph/test/UpdateTestBase.h index 7eefc7aed06..d5e92120a57 100644 --- a/src/graph/test/UpdateTestBase.h +++ b/src/graph/test/UpdateTestBase.h @@ -12,7 +12,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -118,7 +118,7 @@ AssertionResult UpdateTestBase::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed"; } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index a7df591caaf..aa67f148311 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -392,11 +392,14 @@ struct HBResp { 1: ErrorCode code, 2: common.HostAddr leader, 3: common.ClusterID cluster_id, + 4: i64 last_update_time_in_ms, } struct HBReq { - 1: common.HostAddr host, - 2: common.ClusterID cluster_id, + 1: bool in_storaged, + 2: common.HostAddr host, + 3: common.ClusterID cluster_id, + 4: optional map> (cpp.template = "std::unordered_map") leader_partIds; } struct CreateTagIndexReq { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 44c6b4920ac..6dd6abc41f7 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -173,11 +173,10 @@ class NebulaStore : public KVStore, public Handler { ResultCode setWriteBlocking(GraphSpaceID spaceId, bool sign) override; - int32_t allLeader(std::unordered_map>& leaderIds) override; - bool isLeader(GraphSpaceID spaceId, PartitionID partId); + ErrorOr> space(GraphSpaceID spaceId); + /** * Implement four interfaces in Handler. * */ @@ -189,7 +188,8 @@ class NebulaStore : public KVStore, public Handler { void removePart(GraphSpaceID spaceId, PartitionID partId) override; - ErrorOr> space(GraphSpaceID spaceId); + int32_t allLeader(std::unordered_map>& leaderIds) override; private: void updateSpaceOption(GraphSpaceID spaceId, diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 381ed9a343d..57aff978f83 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -166,5 +166,14 @@ void MetaServerBasedPartManager::onPartUpdated(const PartMeta& partMeta) { UNUSED(partMeta); } +void MetaServerBasedPartManager::fetchLeaderInfo( + std::unordered_map>& leaderIds) { + if (handler_ != nullptr) { + handler_->allLeader(leaderIds); + } else { + VLOG(1) << "handler_ is nullptr!"; + } +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 13a9552dbea..b7e0b24161c 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -24,6 +24,8 @@ class Handler { bool isDbOption) = 0; virtual void removeSpace(GraphSpaceID spaceId) = 0; virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0; + virtual int32_t allLeader(std::unordered_map>& leaderIds) = 0; }; @@ -167,6 +169,9 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL void onPartUpdated(const PartMeta& partMeta) override; + void fetchLeaderInfo(std::unordered_map>& leaderParts) override; + HostAddr getLocalHost() { return localHost_; } diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 7021f6ce42f..a5bd6610266 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -59,5 +59,34 @@ bool ActiveHostsMan::isLived(kvstore::KVStore* kv, const HostAddr& host) { return std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end(); } +kvstore::ResultCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const int64_t timeInMilliSec) { + CHECK_NOTNULL(kv); + std::vector data; + data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(), + MetaServiceUtils::lastUpdateTimeVal(timeInMilliSec)); + + folly::SharedMutex::WriteHolder wHolder(LockUtils::lastUpdateTimeLock()); + folly::Baton baton; + kvstore::ResultCode ret; + kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [&] (kvstore::ResultCode code) { + ret = code; + baton.post(); + }); + baton.wait(); + return ret; +} + +int64_t LastUpdateTimeMan::get(kvstore::KVStore* kv) { + CHECK_NOTNULL(kv); + auto key = MetaServiceUtils::lastUpdateTimeKey(); + std::string val; + auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val); + if (ret == kvstore::ResultCode::SUCCEEDED) { + return *reinterpret_cast(val.data()); + } + return 0; +} + } // namespace meta } // namespace nebula diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index 8f3a06198bd..4bda8ff0b9a 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -59,6 +59,18 @@ class ActiveHostsMan final { ActiveHostsMan() = default; }; +class LastUpdateTimeMan final { +public: + ~LastUpdateTimeMan() = default; + + static kvstore::ResultCode update(kvstore::KVStore* kv, const int64_t timeInMilliSec); + + static int64_t get(kvstore::KVStore* kv); + +protected: + LastUpdateTimeMan() = default; +}; + } // namespace meta } // namespace nebula diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index 98a9ccce13d..67b7037568f 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -11,23 +11,38 @@ namespace nebula { namespace meta { -const std::string kSpacesTable = "__spaces__"; // NOLINT -const std::string kPartsTable = "__parts__"; // NOLINT -const std::string kHostsTable = "__hosts__"; // NOLINT -const std::string kTagsTable = "__tags__"; // NOLINT -const std::string kEdgesTable = "__edges__"; // NOLINT -const std::string kTagIndexesTable = "__tag_indexes__"; // NOLINT -const std::string kEdgeIndexesTable = "__edge_indexes__"; // NOLINT -const std::string kIndexTable = "__index__"; // NOLINT -const std::string kUsersTable = "__users__"; // NOLINT -const std::string kRolesTable = "__roles__"; // NOLINT -const std::string kConfigsTable = "__configs__"; // NOLINT -const std::string kDefaultTable = "__default__"; // NOLINT -const std::string kSnapshotsTable = "__snapshots__"; // NOLINT +const std::string kSpacesTable = "__spaces__"; // NOLINT +const std::string kPartsTable = "__parts__"; // NOLINT +const std::string kHostsTable = "__hosts__"; // NOLINT +const std::string kTagsTable = "__tags__"; // NOLINT +const std::string kEdgesTable = "__edges__"; // NOLINT +const std::string kTagIndexesTable = "__tag_indexes__"; // NOLINT +const std::string kEdgeIndexesTable = "__edge_indexes__"; // NOLINT +const std::string kIndexTable = "__index__"; // NOLINT +const std::string kUsersTable = "__users__"; // NOLINT +const std::string kRolesTable = "__roles__"; // NOLINT +const std::string kConfigsTable = "__configs__"; // NOLINT +const std::string kDefaultTable = "__default__"; // NOLINT +const std::string kSnapshotsTable = "__snapshots__"; // NOLINT +const std::string kLastUpdateTimeTable = "__last_update_time__"; // NOLINT const std::string kHostOnline = "Online"; // NOLINT const std::string kHostOffline = "Offline"; // NOLINT +std::string MetaServiceUtils::lastUpdateTimeKey() { + std::string key; + key.reserve(128); + key.append(kLastUpdateTimeTable.data(), kLastUpdateTimeTable.size()); + return key; +} + +std::string MetaServiceUtils::lastUpdateTimeVal(const int64_t timeInMilliSec) { + std::string val; + val.reserve(sizeof(int64_t)); + val.append(reinterpret_cast(&timeInMilliSec), sizeof(int64_t)); + return val; +} + std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) { std::string key; key.reserve(128); @@ -42,7 +57,7 @@ std::string MetaServiceUtils::spaceVal(const cpp2::SpaceProperties &properties) return val; } -cpp2::SpaceProperties MetaServiceUtils::parseSpace(folly::StringPiece rawData) { +cpp2::SpaceProperties MetaServiceUtils::parseSpace(folly::StringPiece rawData) { cpp2::SpaceProperties properties; apache::thrift::CompactSerializer::deserialize(rawData, properties); return properties; @@ -69,6 +84,16 @@ std::string MetaServiceUtils::partKey(GraphSpaceID spaceId, PartitionID partId) return key; } +GraphSpaceID MetaServiceUtils::parsePartKeySpaceId(folly::StringPiece key) { + return *reinterpret_cast(key.data() + kPartsTable.size()); +} + +PartitionID MetaServiceUtils::parsePartKeyPartId(folly::StringPiece key) { + return *reinterpret_cast(key.data() + + kPartsTable.size() + + sizeof(GraphSpaceID)); +} + std::string MetaServiceUtils::partVal(const std::vector& hosts) { std::string val; val.reserve(128); diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index 2b8e24125e3..0eba5c3d569 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -10,6 +10,7 @@ #include "base/Base.h" #include "base/Status.h" #include "interface/gen-cpp2/meta_types.h" +#include "meta/ActiveHostsMan.h" namespace nebula { namespace meta { @@ -30,6 +31,10 @@ class MetaServiceUtils final { public: MetaServiceUtils() = delete; + static std::string lastUpdateTimeKey(); + + static std::string lastUpdateTimeVal(const int64_t timeInMilliSec); + static std::string spaceKey(GraphSpaceID spaceId); static std::string spaceVal(const cpp2::SpaceProperties &properties); @@ -44,6 +49,10 @@ class MetaServiceUtils final { static std::string partKey(GraphSpaceID spaceId, PartitionID partId); + static GraphSpaceID parsePartKeySpaceId(folly::StringPiece key); + + static PartitionID parsePartKeyPartId(folly::StringPiece key); + static std::string partVal(const std::vector& hosts); static const std::string& partPrefix(); diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index ae73cdf6ca2..988771bbc3f 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -15,8 +15,7 @@ #include "stats/StatsManager.h" -DEFINE_int32(load_data_interval_secs, 1, "Load data interval"); -DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval"); +DEFINE_int32(heartbeat_interval_secs, 30, "Heartbeat interval"); DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry"); DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry"); DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout"); @@ -32,13 +31,13 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool std::vector addrs, HostAddr localHost, ClusterID clusterId, - bool sendHeartBeat, + bool inStoraged, const std::string &serviceName) : ioThreadPool_(ioThreadPool) , addrs_(std::move(addrs)) , localHost_(localHost) , clusterId_(clusterId) - , sendHeartBeat_(sendHeartBeat) { + , inStoraged_(inStoraged) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) << "No meta server address is specified. Meta server is required"; @@ -60,17 +59,20 @@ MetaClient::~MetaClient() { bool MetaClient::isMetadReady() { - if (sendHeartBeat_) { - auto ret = heartbeat().get(); - if (!ret.ok() && ret.status() != Status::LeaderChanged()) { - LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); - ready_ = false; - return ready_; - } - } // end if - loadData(); + auto ret = heartbeat().get(); + if (!ret.ok() && ret.status() != Status::LeaderChanged()) { + LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); + ready_ = false; + return ready_; + } + + bool ldRet = loadData(); + bool lcRet = true; if (!FLAGS_local_config) { - loadCfg(); + lcRet = loadCfg(); + } + if (ldRet && lcRet) { + localLastUpdateTime_ = metadLastUpdateTime_; } return ready_; } @@ -93,17 +95,11 @@ bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) { } CHECK(bgThread_->start()); - if (sendHeartBeat_) { - LOG(INFO) << "Register time task for heartbeat!"; - size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addTimerTask(delayMS, - FLAGS_heartbeat_interval_secs * 1000, - &MetaClient::heartBeatThreadFunc, this); - } - addLoadDataTask(); - if (!FLAGS_local_config) { - addLoadCfgTask(); - } + LOG(INFO) << "Register time task for heartbeat!"; + size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900); + bgThread_->addTimerTask(delayMS, + FLAGS_heartbeat_interval_secs * 1000, + &MetaClient::heartBeatThreadFunc, this); return ready_; } @@ -122,11 +118,18 @@ void MetaClient::heartBeatThreadFunc() { LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); return; } -} -void MetaClient::loadDataThreadFunc() { - loadData(); - addLoadDataTask(); + // if MetaServer has some changes, refesh the localCache_ + if (localLastUpdateTime_ < metadLastUpdateTime_) { + bool ldRet = loadData(); + bool lcRet = true; + if (!FLAGS_local_config) { + lcRet = loadCfg(); + } + if (ldRet && lcRet) { + localLastUpdateTime_ = metadLastUpdateTime_; + } + } } bool MetaClient::loadData() { @@ -198,13 +201,6 @@ bool MetaClient::loadData() { return true; } -void MetaClient::addLoadDataTask() { - size_t delayMS = FLAGS_load_data_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addDelayTask(delayMS, &MetaClient::loadDataThreadFunc, this); - LOG(INFO) << "Load data completed, call after " << delayMS << " ms"; -} - - bool MetaClient::loadSchemas(GraphSpaceID spaceId, std::shared_ptr spaceInfoCache, SpaceTagNameIdMap &tagNameIdMap, @@ -1309,7 +1305,6 @@ StatusOr MetaClient::getNewestTagVerFromCache(const GraphSpaceID& spa return it->second; } - StatusOr MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& space, const EdgeType& edgeType) { if (!ready_) { @@ -1323,17 +1318,31 @@ StatusOr MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& sp return it->second; } - folly::Future> MetaClient::heartbeat() { if (clusterId_.load() == 0) { clusterId_ = ClusterIdMan::getClusterIdFromFile(FLAGS_cluster_id_path); } cpp2::HBReq req; + req.set_in_storaged(inStoraged_); nebula::cpp2::HostAddr thriftHost; thriftHost.set_ip(localHost_.first); thriftHost.set_port(localHost_.second); req.set_host(std::move(thriftHost)); req.set_cluster_id(clusterId_.load()); + + if (inStoraged_ && listener_ != nullptr) { + std::unordered_map> leaderIds; + listener_->fetchLeaderInfo(leaderIds); + if (leaderIds_ != leaderIds) { + { + folly::RWSpinLock::WriteHolder holder(leaderIdsLock_); + leaderIds_.clear(); + leaderIds_ = leaderIds; + } + req.set_leader_partIds(std::move(leaderIds)); + } + } + folly::Promise> promise; auto future = promise.getFuture(); VLOG(1) << "Send heartbeat to " << leader_ << ", clusterId " << req.get_cluster_id(); @@ -1350,7 +1359,9 @@ folly::Future> MetaClient::heartbeat() { << FLAGS_cluster_id_path; } } - return true; + metadLastUpdateTime_ = resp.get_last_update_time_in_ms(); + LOG(INFO) << "Metad last update time: " << metadLastUpdateTime_; + return true; // resp.code == cpp2::ErrorCode::SUCCEEDED }, std::move(promise), true); return future; } @@ -1566,11 +1577,6 @@ folly::Future>> MetaClient::listSnapshots() return future; } -void MetaClient::loadCfgThreadFunc() { - loadCfg(); - addLoadCfgTask(); -} - bool MetaClient::registerCfg() { auto ret = regConfig(gflagsDeclared_).get(); if (ret.ok()) { @@ -1580,9 +1586,9 @@ bool MetaClient::registerCfg() { return configReady_; } -void MetaClient::loadCfg() { +bool MetaClient::loadCfg() { if (!configReady_ && !registerCfg()) { - return; + return false; } // only load current module's config is enough auto ret = listConfigs(gflagsModule_).get(); @@ -1612,14 +1618,9 @@ void MetaClient::loadCfg() { } } else { LOG(ERROR) << "Load configs failed: " << ret.status(); - return; + return false; } -} - -void MetaClient::addLoadCfgTask() { - size_t delayMS = FLAGS_load_data_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addDelayTask(delayMS, &MetaClient::loadCfgThreadFunc, this); - LOG(INFO) << "Load configs completed, call after " << delayMS << " ms"; + return true; } void MetaClient::updateGflagsValue(const ConfigItem& item) { diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index e78b650c261..14556470a3f 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -97,6 +97,8 @@ class MetaChangedListener { virtual void onPartAdded(const PartMeta& partMeta) = 0; virtual void onPartRemoved(GraphSpaceID spaceId, PartitionID partId) = 0; virtual void onPartUpdated(const PartMeta& partMeta) = 0; + virtual void fetchLeaderInfo(std::unordered_map>& leaderIds) = 0; }; class MetaClient { @@ -114,10 +116,9 @@ class MetaClient { std::vector addrs, HostAddr localHost = HostAddr(0, 0), ClusterID clusterId = 0, - bool sendHeartBeat = false, + bool inStoraged = true, const std::string &serviceName = ""); - virtual ~MetaClient(); bool isMetadReady(); @@ -348,18 +349,12 @@ class MetaClient { Status refreshCache(); protected: - void loadDataThreadFunc(); // Return true if load succeeded. bool loadData(); - - void addLoadDataTask(); - + bool loadCfg(); void heartBeatThreadFunc(); - void loadCfgThreadFunc(); - void loadCfg(); bool registerCfg(); - void addLoadCfgTask(); void updateGflagsValue(const ConfigItem& item); void updateNestedGflags(const std::string& name); @@ -425,6 +420,11 @@ class MetaClient { std::shared_ptr ioThreadPool_; std::shared_ptr> clientsMan_; + std::unordered_map> leaderIds_; + folly::RWSpinLock leaderIdsLock_; + int64_t localLastUpdateTime_{0}; + int64_t metadLastUpdateTime_{0}; + LocalCache localCache_; std::vector addrs_; // The lock used to protect active_ and leader_. @@ -440,13 +440,13 @@ class MetaClient { SpaceEdgeTypeNameMap spaceEdgeIndexByType_; SpaceNewestTagVerMap spaceNewestTagVerMap_; SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_; - SpaceAllEdgeMap spaceAllEdgeMap_; + SpaceAllEdgeMap spaceAllEdgeMap_; folly::RWSpinLock localCacheLock_; MetaChangedListener* listener_{nullptr}; folly::RWSpinLock listenerLock_; std::atomic clusterId_{0}; bool isRunning_{false}; - bool sendHeartBeat_{false}; + bool inStoraged_{true}; std::atomic_bool ready_{false}; MetaConfigMap metaConfigMap_; folly::RWSpinLock configCacheLock_; diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index 22ab9ac4599..0c1f4af3a7f 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -18,6 +18,7 @@ #include "meta/common/MetaCommon.h" #include "network/NetworkUtils.h" #include "meta/processors/Common.h" +#include "meta/ActiveHostsMan.h" #include "stats/Stats.h" namespace nebula { @@ -163,8 +164,7 @@ class BaseProcessor { /** * Remove keys from start to end, doesn't contain end. * */ - void doRemoveRange(const std::string& start, - const std::string& end); + void doRemoveRange(const std::string& start, const std::string& end); /** * Scan keys from start to end, doesn't contain end. diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h index 3039fc1ed49..f78153606c1 100644 --- a/src/meta/processors/Common.h +++ b/src/meta/processors/Common.h @@ -26,6 +26,7 @@ class LockUtils { return l; \ } +GENERATE_LOCK(lastUpdateTime); GENERATE_LOCK(space); GENERATE_LOCK(id); GENERATE_LOCK(tag); diff --git a/src/meta/processors/admin/BalanceProcessor.cpp b/src/meta/processors/admin/BalanceProcessor.cpp index 879df0413eb..57fb8b320dd 100644 --- a/src/meta/processors/admin/BalanceProcessor.cpp +++ b/src/meta/processors/admin/BalanceProcessor.cpp @@ -90,6 +90,7 @@ void BalanceProcessor::process(const cpp2::BalanceReq& req) { } resp_.set_id(value(ret)); resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); onFinished(); } diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index a4db3ffebb8..716a1c7c942 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -15,7 +15,6 @@ DEFINE_bool(hosts_whitelist_enabled, false, "Check host whether in whitelist whe namespace nebula { namespace meta { - void HBProcessor::process(const cpp2::HBReq& req) { HostAddr host(req.host.ip, req.host.port); if (FLAGS_hosts_whitelist_enabled @@ -39,8 +38,16 @@ void HBProcessor::process(const cpp2::HBReq& req) { } LOG(INFO) << "Receive heartbeat from " << host; - HostInfo info(time::WallClock::fastNowInMilliSec()); - auto ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info); + auto ret = kvstore::ResultCode::SUCCEEDED; + if (req.get_in_storaged()) { + HostInfo info(time::WallClock::fastNowInMilliSec()); + ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info); + if (req.__isset.leader_partIds) { + const auto& leaderIds = req.get_leader_partIds(); + UNUSED(leaderIds); + // TODO(zhangguoqing) put the leader parts information into kvstore + } + } resp_.set_code(to(ret)); if (ret == kvstore::ResultCode::ERR_LEADER_CHANGED) { auto leaderRet = kvstore_->partLeader(kDefaultSpaceId, kDefaultPartId); @@ -48,6 +55,8 @@ void HBProcessor::process(const cpp2::HBReq& req) { resp_.set_leader(toThriftHost(nebula::value(leaderRet))); } } + int64_t lastUpdateTime = LastUpdateTimeMan::get(this->kvstore_); + resp_.set_last_update_time_in_ms(lastUpdateTime); onFinished(); } diff --git a/src/meta/processors/admin/LeaderBalanceProcessor.cpp b/src/meta/processors/admin/LeaderBalanceProcessor.cpp index 5c2076e6c3f..6cfab30fe32 100644 --- a/src/meta/processors/admin/LeaderBalanceProcessor.cpp +++ b/src/meta/processors/admin/LeaderBalanceProcessor.cpp @@ -15,6 +15,7 @@ void LeaderBalanceProcessor::process(const cpp2::LeaderBalanceReq& req) { UNUSED(req); auto ret = Balancer::instance(kvstore_)->leaderBalance(); resp_.set_code(ret); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); onFinished(); } diff --git a/src/meta/processors/configMan/RegConfigProcessor.cpp b/src/meta/processors/configMan/RegConfigProcessor.cpp index d6ce8a24b59..b3fd5b87564 100644 --- a/src/meta/processors/configMan/RegConfigProcessor.cpp +++ b/src/meta/processors/configMan/RegConfigProcessor.cpp @@ -30,6 +30,7 @@ void RegConfigProcessor::process(const cpp2::RegConfigReq& req) { } if (!data.empty()) { + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); doPut(std::move(data)); return; } diff --git a/src/meta/processors/configMan/SetConfigProcessor.cpp b/src/meta/processors/configMan/SetConfigProcessor.cpp index d81fabfd613..b6c5040b4ec 100644 --- a/src/meta/processors/configMan/SetConfigProcessor.cpp +++ b/src/meta/processors/configMan/SetConfigProcessor.cpp @@ -92,6 +92,7 @@ void SetConfigProcessor::process(const cpp2::SetConfigReq& req) { } if (!data.empty()) { + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); doPut(std::move(data)); } return; diff --git a/src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp b/src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp index cb67bb1a326..26552be8b6d 100644 --- a/src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp +++ b/src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp @@ -88,6 +88,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { std::string(reinterpret_cast(&edgeIndex), sizeof(EdgeIndexID))); data.emplace_back(MetaServiceUtils::edgeIndexKey(space, edgeIndex), MetaServiceUtils::edgeIndexVal(indexName, indexFields)); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); LOG(INFO) << "Create Edge Index " << indexName << ", edgeIndex " << edgeIndex; resp_.set_id(to(edgeIndex, EntryType::EDGE_INDEX)); doPut(std::move(data)); diff --git a/src/meta/processors/indexMan/CreateTagIndexProcessor.cpp b/src/meta/processors/indexMan/CreateTagIndexProcessor.cpp index cbedc514d37..38cd68436bb 100644 --- a/src/meta/processors/indexMan/CreateTagIndexProcessor.cpp +++ b/src/meta/processors/indexMan/CreateTagIndexProcessor.cpp @@ -89,6 +89,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) { std::string(reinterpret_cast(&tagIndex), sizeof(TagIndexID))); data.emplace_back(MetaServiceUtils::tagIndexKey(space, tagIndex), MetaServiceUtils::tagIndexVal(indexName, std::move(indexFields))); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); LOG(INFO) << "Create Tag Index " << indexName << ", tagIndex " << tagIndex; resp_.set_id(to(tagIndex, EntryType::TAG_INDEX)); doPut(std::move(data)); diff --git a/src/meta/processors/indexMan/DropEdgeIndexProcessor.cpp b/src/meta/processors/indexMan/DropEdgeIndexProcessor.cpp index eecdc485007..248fc6f4094 100644 --- a/src/meta/processors/indexMan/DropEdgeIndexProcessor.cpp +++ b/src/meta/processors/indexMan/DropEdgeIndexProcessor.cpp @@ -27,6 +27,7 @@ void DropEdgeIndexProcessor::process(const cpp2::DropEdgeIndexReq& req) { keys.emplace_back(MetaServiceUtils::indexEdgeIndexKey(spaceID, indexName)); keys.emplace_back(MetaServiceUtils::edgeIndexKey(spaceID, edgeIndexID.value())); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); LOG(INFO) << "Drop Edge Index " << indexName; resp_.set_id(to(edgeIndexID.value(), EntryType::EDGE_INDEX)); doMultiRemove(keys); diff --git a/src/meta/processors/indexMan/DropTagIndexProcessor.cpp b/src/meta/processors/indexMan/DropTagIndexProcessor.cpp index b352a974fab..378bdc21141 100644 --- a/src/meta/processors/indexMan/DropTagIndexProcessor.cpp +++ b/src/meta/processors/indexMan/DropTagIndexProcessor.cpp @@ -27,6 +27,7 @@ void DropTagIndexProcessor::process(const cpp2::DropTagIndexReq& req) { keys.emplace_back(MetaServiceUtils::indexTagIndexKey(spaceID, indexName)); keys.emplace_back(MetaServiceUtils::tagIndexKey(spaceID, tagIndexID.value())); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); LOG(INFO) << "Drop Tag Index " << indexName; resp_.set_id(to(tagIndexID.value(), EntryType::TAG_INDEX)); doMultiRemove(keys); diff --git a/src/meta/processors/partsMan/CreateSpaceProcessor.cpp b/src/meta/processors/partsMan/CreateSpaceProcessor.cpp index 4812a4e563d..800fa6e6459 100644 --- a/src/meta/processors/partsMan/CreateSpaceProcessor.cpp +++ b/src/meta/processors/partsMan/CreateSpaceProcessor.cpp @@ -83,6 +83,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { } resp_.set_code(cpp2::ErrorCode::SUCCEEDED); resp_.set_id(to(spaceId, EntryType::SPACE)); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); doPut(std::move(data)); } diff --git a/src/meta/processors/partsMan/DropSpaceProcessor.cpp b/src/meta/processors/partsMan/DropSpaceProcessor.cpp index e707cea249d..b81c6faaf8f 100644 --- a/src/meta/processors/partsMan/DropSpaceProcessor.cpp +++ b/src/meta/processors/partsMan/DropSpaceProcessor.cpp @@ -44,6 +44,7 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) { // delete related role data. // TODO(boshengchen) delete related role data under the space // TODO(YT) delete Tag/Edge under the space + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); doMultiRemove(std::move(deleteKeys)); // TODO(YT) delete part files of the space } diff --git a/src/meta/processors/partsMan/ListHostsProcessor.cpp b/src/meta/processors/partsMan/ListHostsProcessor.cpp index 0604ae373f9..dc41b7e9746 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.cpp +++ b/src/meta/processors/partsMan/ListHostsProcessor.cpp @@ -50,10 +50,8 @@ StatusOr> ListHostsProcessor::allHostsWithStatus( std::vector removeHostsKey; while (iter->valid()) { cpp2::HostItem item; - nebula::cpp2::HostAddr host; - auto hostAddrPiece = iter->key().subpiece(hostPrefix.size()); - memcpy(&host, hostAddrPiece.data(), hostAddrPiece.size()); - item.set_hostAddr(host); + auto host = MetaServiceUtils::parseHostKey(iter->key()); + item.set_hostAddr(std::move(host)); HostInfo info = HostInfo::decode(iter->val()); if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) { if (now - info.lastHBTimeInMilliSec_ < FLAGS_expired_threshold_sec * 1000) { @@ -97,9 +95,7 @@ StatusOr> ListHostsProcessor::allHostsWithStatus( return Status::Error("Cant't find any partitions"); } while (iter->valid()) { - auto key = iter->key(); - PartitionID partId; - memcpy(&partId, key.data() + partPrefix.size(), sizeof(PartitionID)); + PartitionID partId = MetaServiceUtils::parsePartKeyPartId(iter->key()); auto partHosts = MetaServiceUtils::parsePartVal(iter->val()); for (auto& host : partHosts) { hostParts[HostAddr(host.ip, host.port)].emplace_back(partId); @@ -176,7 +172,6 @@ void ListHostsProcessor::getLeaderDist( } } - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/schemaMan/AlterEdgeProcessor.cpp b/src/meta/processors/schemaMan/AlterEdgeProcessor.cpp index e21c753e0cd..cbfb13f4f9d 100644 --- a/src/meta/processors/schemaMan/AlterEdgeProcessor.cpp +++ b/src/meta/processors/schemaMan/AlterEdgeProcessor.cpp @@ -73,6 +73,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) { data.emplace_back(MetaServiceUtils::schemaEdgeKey(req.get_space_id(), edgeType, version), MetaServiceUtils::schemaEdgeVal(req.get_edge_name(), schema)); resp_.set_id(to(edgeType, EntryType::EDGE)); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); doPut(std::move(data)); } diff --git a/src/meta/processors/schemaMan/AlterTagProcessor.cpp b/src/meta/processors/schemaMan/AlterTagProcessor.cpp index 8b16d96ca50..b922d23f1a3 100644 --- a/src/meta/processors/schemaMan/AlterTagProcessor.cpp +++ b/src/meta/processors/schemaMan/AlterTagProcessor.cpp @@ -73,6 +73,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) { data.emplace_back(MetaServiceUtils::schemaTagKey(req.get_space_id(), tagId, version), MetaServiceUtils::schemaTagVal(req.get_tag_name(), schema)); resp_.set_id(to(tagId, EntryType::TAG)); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); doPut(std::move(data)); } diff --git a/src/meta/processors/schemaMan/CreateEdgeProcessor.cpp b/src/meta/processors/schemaMan/CreateEdgeProcessor.cpp index 2b864349da8..c24ce50e319 100644 --- a/src/meta/processors/schemaMan/CreateEdgeProcessor.cpp +++ b/src/meta/processors/schemaMan/CreateEdgeProcessor.cpp @@ -119,6 +119,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) { LOG(INFO) << "Create Edge " << edgeName << ", edgeType " << edgeType; resp_.set_code(cpp2::ErrorCode::SUCCEEDED); resp_.set_id(to(edgeType, EntryType::EDGE)); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); doPut(std::move(data)); } diff --git a/src/meta/processors/schemaMan/CreateTagProcessor.cpp b/src/meta/processors/schemaMan/CreateTagProcessor.cpp index 3360981c116..2c3d3bf7b53 100644 --- a/src/meta/processors/schemaMan/CreateTagProcessor.cpp +++ b/src/meta/processors/schemaMan/CreateTagProcessor.cpp @@ -132,6 +132,7 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) { LOG(INFO) << "Create Tag " << tagName << ", TagID " << tagId; resp_.set_code(cpp2::ErrorCode::SUCCEEDED); resp_.set_id(to(tagId, EntryType::TAG)); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); doPut(std::move(data)); } diff --git a/src/meta/processors/schemaMan/DropEdgeProcessor.cpp b/src/meta/processors/schemaMan/DropEdgeProcessor.cpp index 382a600859f..144f96be08a 100644 --- a/src/meta/processors/schemaMan/DropEdgeProcessor.cpp +++ b/src/meta/processors/schemaMan/DropEdgeProcessor.cpp @@ -19,6 +19,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) { return; } resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); LOG(INFO) << "Drop Edge " << req.get_edge_name(); doMultiRemove(std::move(ret.value())); } diff --git a/src/meta/processors/schemaMan/DropTagProcessor.cpp b/src/meta/processors/schemaMan/DropTagProcessor.cpp index 10e5ac47ac0..14c24ec4980 100644 --- a/src/meta/processors/schemaMan/DropTagProcessor.cpp +++ b/src/meta/processors/schemaMan/DropTagProcessor.cpp @@ -20,6 +20,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) { return; } resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()); LOG(INFO) << "Drop Tag " << req.get_tag_name(); doMultiRemove(std::move(ret.value())); } diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveHostsManTest.cpp index db7203d7522..7365258c94e 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveHostsManTest.cpp @@ -52,6 +52,29 @@ TEST(ActiveHostsManTest, NormalTest) { ASSERT_EQ(1, ActiveHostsMan::getActiveHosts(kv.get()).size()); } +TEST(LastUpdateTimeManTest, NormalTest) { + fs::TempDir rootPath("/tmp/LastUpdateTimeManTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + + ASSERT_EQ(0, LastUpdateTimeMan::get(kv.get())); + int64_t now = time::WallClock::fastNowInMilliSec(); + + LastUpdateTimeMan::update(kv.get(), now); + ASSERT_EQ(now, LastUpdateTimeMan::get(kv.get())); + LastUpdateTimeMan::update(kv.get(), now + 100); + ASSERT_EQ(now + 100, LastUpdateTimeMan::get(kv.get())); + + LastUpdateTimeMan::update(kv.get(), now - 100); + { + auto key = MetaServiceUtils::lastUpdateTimeKey(); + std::string val; + auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val); + ASSERT_EQ(ret, kvstore::ResultCode::SUCCEEDED); + int64_t lastUpdateTime = *reinterpret_cast(val.data()); + ASSERT_EQ(now - 100, lastUpdateTime); + } +} + } // namespace meta } // namespace nebula @@ -63,4 +86,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp index fc4df42a329..6587b89d243 100644 --- a/src/meta/test/BalanceIntegrationTest.cpp +++ b/src/meta/test/BalanceIntegrationTest.cpp @@ -15,7 +15,6 @@ #include "storage/test/TestUtils.h" #include "dataman/RowWriter.h" -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); DECLARE_uint32(raft_heartbeat_interval_secs); DECLARE_int32(expired_threshold_sec); @@ -24,7 +23,6 @@ namespace nebula { namespace meta { TEST(BalanceIntegrationTest, BalanceTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; FLAGS_raft_heartbeat_interval_secs = 1; FLAGS_expired_threshold_sec = 3; @@ -49,8 +47,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { LOG(INFO) << "Create meta client..."; uint32_t tempDataPort = network::NetworkUtils::getAvailablePort(); HostAddr tempDataAddr(localIp, tempDataPort); - auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, - kClusterId, false); + auto mClient = std::make_unique(threadPool, + metaAddr, + tempDataAddr, + kClusterId, + false); mClient->waitForMetadReady(); @@ -69,8 +70,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { VLOG(1) << "The storage server has been added to the meta service"; - auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto metaClient = std::make_shared(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); metaClient->waitForMetadReady(); metaClients.emplace_back(metaClient); } @@ -102,7 +106,7 @@ TEST(BalanceIntegrationTest, BalanceTest) { auto tagRet = mClient->createTagSchema(spaceId, "tag", std::move(schema)).get(); ASSERT_TRUE(tagRet.ok()); auto tagId = tagRet.value(); - sleep(FLAGS_load_data_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); LOG(INFO) << "Let's write some data"; auto sClient = std::make_unique(threadPool, mClient.get()); @@ -173,8 +177,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { uint32_t storagePort = network::NetworkUtils::getAvailablePort(); HostAddr storageAddr(localIp, storagePort); { - newMetaClient = std::make_unique(threadPool, metaAddr, storageAddr, - kClusterId, true); + newMetaClient = std::make_unique(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); newMetaClient->waitForMetadReady(); std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), replica + 1); newServer = storage::TestUtils::mockStorageServer(newMetaClient.get(), @@ -247,7 +254,6 @@ TEST(BalanceIntegrationTest, BalanceTest) { } TEST(BalanceIntegrationTest, LeaderBalanceTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; FLAGS_raft_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/balance_integration_test.XXXXXX"); @@ -271,8 +277,11 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { LOG(INFO) << "Create meta client..."; uint32_t tempDataPort = network::NetworkUtils::getAvailablePort(); HostAddr tempDataAddr(localIp, tempDataPort); - auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, - kClusterId, false); + auto mClient = std::make_unique(threadPool, + metaAddr, + tempDataAddr, + kClusterId, + false); mClient->waitForMetadReady(); @@ -291,8 +300,11 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { VLOG(1) << "The storage server has been added to the meta service"; - auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto metaClient = std::make_shared(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); metaClient->waitForMetadReady(); metaClients.emplace_back(metaClient); } @@ -322,6 +334,7 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { << ", expected " << partition; sleep(1); } + auto code = balancer.leaderBalance(); ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); @@ -349,4 +362,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/meta/test/ConfigManTest.cpp b/src/meta/test/ConfigManTest.cpp index 37445e11d42..3a351cb9268 100644 --- a/src/meta/test/ConfigManTest.cpp +++ b/src/meta/test/ConfigManTest.cpp @@ -20,7 +20,7 @@ #include "storage/test/TestUtils.h" #include "rocksdb/utilities/options_util.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); DECLARE_string(rocksdb_db_options); DECLARE_string(rocksdb_column_family_options); @@ -264,7 +264,7 @@ ConfigItem toConfigItem(const cpp2::ConfigItem& item) { } TEST(ConfigManTest, MetaConfigManTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaConfigManTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -312,7 +312,7 @@ TEST(ConfigManTest, MetaConfigManTest) { std::string name = "not_existed"; auto type = cpp2::ConfigType::INT64; - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); // get/set without register auto setRet = cfgMan.setConfig(module, name, type, 101l).get(); ASSERT_FALSE(setRet.ok()); @@ -335,7 +335,7 @@ TEST(ConfigManTest, MetaConfigManTest) { auto value = boost::get(item.value_); ASSERT_EQ(value, 100); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_int64_key_immutable, 100); } // mutable config @@ -356,7 +356,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, 102); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_int64_key, 102); } { @@ -376,7 +376,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, true); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_bool_key, true); } { @@ -396,7 +396,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, 3.14); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_double_key, 3.14); } { @@ -417,7 +417,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, "abc"); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_string_key, "abc"); } { @@ -445,7 +445,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(val, "8"); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); confRet = conf.parseFromString(FLAGS_nested_key); ASSERT_TRUE(confRet.ok()); status = conf.fetchAsString("max_background_compactions", val); @@ -460,7 +460,7 @@ TEST(ConfigManTest, MetaConfigManTest) { } TEST(ConfigManTest, MockConfigTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MockConfigTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -510,7 +510,7 @@ TEST(ConfigManTest, MockConfigTest) { } // check values in ClientBaseGflagsManager - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_test0, "updated0"); ASSERT_EQ(FLAGS_test1, "updated1"); ASSERT_EQ(FLAGS_test2, "updated2"); @@ -519,7 +519,7 @@ TEST(ConfigManTest, MockConfigTest) { } TEST(ConfigManTest, RocksdbOptionsTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/RocksdbOptionsTest.XXXXXX"); IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -541,8 +541,11 @@ TEST(ConfigManTest, RocksdbOptionsTest) { LOG(INFO) << "Create meta client..."; uint32_t storagePort = network::NetworkUtils::getAvailablePort(); HostAddr storageAddr(localIp, storagePort); - auto mClient = std::make_unique(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto mClient = std::make_unique(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); mClient->waitForMetadReady(); mClient->gflagsModule_ = module; @@ -575,7 +578,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto ret = mClient->createSpace("storage", 9, 1).get(); ASSERT_TRUE(ret.ok()); auto spaceId = ret.value(); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); storage::TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 9); { std::string name = "rocksdb_db_options"; @@ -590,7 +593,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); ASSERT_EQ(FLAGS_rocksdb_db_options, value); } { @@ -607,12 +610,12 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); ASSERT_EQ(FLAGS_rocksdb_column_family_options, value); } { // need to sleep a bit to take effect on rocksdb - sleep(3); + sleep(FLAGS_heartbeat_interval_secs + 2); rocksdb::DBOptions loadedDbOpt; std::vector loadedCfDescs; std::string rocksPath = folly::stringPrintf("%s/disk1/nebula/%d/data", diff --git a/src/meta/test/HBProcessorTest.cpp b/src/meta/test/HBProcessorTest.cpp index 6eda99c2e0b..e654d99a707 100644 --- a/src/meta/test/HBProcessorTest.cpp +++ b/src/meta/test/HBProcessorTest.cpp @@ -17,9 +17,6 @@ DECLARE_bool(hosts_whitelist_enabled); namespace nebula { namespace meta { -using nebula::cpp2::SupportedType; -using apache::thrift::FragileConstructor::FRAGILE; - TEST(HBProcessorTest, HBTest) { fs::TempDir rootPath("/tmp/HBTest.XXXXXX"); std::unique_ptr kv(TestUtils::initKV(rootPath.path())); @@ -27,6 +24,7 @@ TEST(HBProcessorTest, HBTest) { { for (auto i = 0; i < 5; i++) { cpp2::HBReq req; + req.set_in_storaged(true); nebula::cpp2::HostAddr thriftHost; thriftHost.set_ip(i); thriftHost.set_port(i); @@ -69,4 +67,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 62fd7f8148f..35b43a626b7 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -17,7 +17,6 @@ #include "meta/test/TestUtils.h" #include "meta/ClientBasedGflagsManager.h" -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); DECLARE_string(rocksdb_db_options); @@ -30,7 +29,7 @@ using nebula::cpp2::Value; using nebula::cpp2::ValueType; TEST(MetaClientTest, InterfacesTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); // Let the system choose an available port for us @@ -45,7 +44,9 @@ TEST(MetaClientTest, InterfacesTest) { HostAddr localHost{localIp, clientPort}; auto client = std::make_shared(threadPool, std::vector{HostAddr(localIp, sc->port_)}, - localHost); + localHost, + 0, + false); client->waitForMetadReady(); { // Add hosts automatically, then testing listHosts interface. @@ -159,7 +160,7 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_EQ(ret1.value().begin()->schema.columns.size(), 5); // getTagSchemaFromCache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); auto ret = client->getNewestTagVerFromCache(spaceId, ret1.value().begin()->tag_id); CHECK(ret.ok()); @@ -232,7 +233,7 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_STREQ("edgeItem0", outSchema1->getFieldName(0)); } } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); { // Test cache interfaces auto partsMap = client->getPartsMapFromCache(HostAddr(0, 0)); @@ -331,7 +332,7 @@ TEST(MetaClientTest, InterfacesTest) { } TEST(MetaClientTest, TagTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTagTest.XXXXXX"); // Let the system choose an available port for us @@ -342,7 +343,7 @@ TEST(MetaClientTest, TagTest) { IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto localhosts = std::vector{HostAddr(localIp, sc->port_)}; - auto client = std::make_shared(threadPool, localhosts); + auto client = std::make_shared(threadPool, localhosts, HostAddr(0, 0), 0, false); std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}}; client->waitForMetadReady(); TestUtils::registerHB(sc->kvStore_.get(), hosts); @@ -448,7 +449,7 @@ TEST(MetaClientTest, TagTest) { } TEST(MetaClientTest, EdgeTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientEdgeTest.XXXXXX"); // Let the system choose an available port for us @@ -571,7 +572,7 @@ TEST(MetaClientTest, EdgeTest) { } TEST(MetaClientTest, TagIndexTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTagIndexTest.XXXXXX"); // Let the system choose an available port for us @@ -769,7 +770,7 @@ TEST(MetaClientTest, TagIndexTest) { } TEST(MetaClientTest, EdgeIndexTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientEdgeIndexTest.XXXXXX"); // Let the system choose an available port for us @@ -1000,6 +1001,12 @@ class TestListener : public MetaChangedListener { partChanged++; } + void fetchLeaderInfo(std::unordered_map>& leaderIds) override { + LOG(INFO) << "Get leader distribution!"; + UNUSED(leaderIds); + } + HostAddr getLocalHost() { return HostAddr(0, 0); } @@ -1011,7 +1018,7 @@ class TestListener : public MetaChangedListener { }; TEST(MetaClientTest, DiffTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); // Let the system choose an available port for us @@ -1023,7 +1030,10 @@ TEST(MetaClientTest, DiffTest) { network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto listener = std::make_unique(); auto client = std::make_shared(threadPool, - std::vector{HostAddr(localIp, sc->port_)}); + std::vector{HostAddr(localIp, sc->port_)}, + HostAddr(0, 0), + 0, + false); client->waitForMetadReady(); client->registerListener(listener.get()); { @@ -1043,14 +1053,14 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->createSpace("default_space", 9, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(1, listener->spaceNum); ASSERT_EQ(9, listener->partNum); { auto ret = client->createSpace("default_space_1", 5, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(2, listener->spaceNum); ASSERT_EQ(14, listener->partNum); { @@ -1058,13 +1068,12 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->dropSpace("default_space_1").get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(1, listener->spaceNum); ASSERT_EQ(9, listener->partNum); } TEST(MetaClientTest, HeartbeatTest) { - FLAGS_load_data_interval_secs = 5; FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); @@ -1081,7 +1090,7 @@ TEST(MetaClientTest, HeartbeatTest) { std::vector{HostAddr(localIp, 10001)}, localHost, kClusterId, - true); // send heartbeat + true); client->waitForMetadReady(); client->registerListener(listener.get()); { @@ -1148,6 +1157,7 @@ class TestMetaServiceRetry : public cpp2::MetaServiceSvIf { }; TEST(MetaClientTest, SimpleTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -1173,6 +1183,7 @@ TEST(MetaClientTest, SimpleTest) { } TEST(MetaClientTest, RetryWithExceptionTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -1195,6 +1206,7 @@ TEST(MetaClientTest, RetryWithExceptionTest) { } TEST(MetaClientTest, RetryOnceTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -1234,6 +1246,7 @@ TEST(MetaClientTest, RetryOnceTest) { } TEST(MetaClientTest, RetryUntilLimitTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -1272,7 +1285,7 @@ TEST(MetaClientTest, RetryUntilLimitTest) { } TEST(MetaClientTest, RocksdbOptionsTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/RocksdbOptionsTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -1308,7 +1321,7 @@ TEST(MetaClientTest, RocksdbOptionsTest) { std::vector hosts = {{0, 0}}; TestUtils::registerHB(sc->kvStore_.get(), hosts); client->createSpace("default_space", 9, 1).get(); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); } { std::string name = "rocksdb_db_options"; @@ -1324,7 +1337,7 @@ TEST(MetaClientTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_rocksdb_db_options, value); ASSERT_EQ(listener->options["disable_auto_compactions"], "true"); ASSERT_EQ(listener->options["level0_file_num_compaction_trigger"], "4"); diff --git a/src/meta/test/MetaHttpDownloadHandlerTest.cpp b/src/meta/test/MetaHttpDownloadHandlerTest.cpp index cf6e179c6de..72d439faa4a 100644 --- a/src/meta/test/MetaHttpDownloadHandlerTest.cpp +++ b/src/meta/test/MetaHttpDownloadHandlerTest.cpp @@ -14,7 +14,6 @@ #include "storage/http/StorageHttpDownloadHandler.h" #include "fs/TempDir.h" -DECLARE_int32(load_data_interval_secs); DECLARE_string(pid_file); DECLARE_int32(ws_storage_http_port); diff --git a/src/meta/test/MetaHttpStatusHandlerTest.cpp b/src/meta/test/MetaHttpStatusHandlerTest.cpp index 830ed25c3d3..6e3257187d5 100644 --- a/src/meta/test/MetaHttpStatusHandlerTest.cpp +++ b/src/meta/test/MetaHttpStatusHandlerTest.cpp @@ -13,7 +13,6 @@ #include "meta/test/TestUtils.h" #include "fs/TempDir.h" -DECLARE_int32(load_data_interval_secs); DECLARE_string(pid_file); namespace nebula { diff --git a/src/meta/test/MetaServiceUtilsTest.cpp b/src/meta/test/MetaServiceUtilsTest.cpp index 4ddb02a3d9c..78befca356b 100644 --- a/src/meta/test/MetaServiceUtilsTest.cpp +++ b/src/meta/test/MetaServiceUtilsTest.cpp @@ -112,4 +112,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/storage/mutate/UpdateEdgeProcessor.cpp b/src/storage/mutate/UpdateEdgeProcessor.cpp index 623f29334ee..aa3c9e584dd 100644 --- a/src/storage/mutate/UpdateEdgeProcessor.cpp +++ b/src/storage/mutate/UpdateEdgeProcessor.cpp @@ -391,7 +391,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { this->spaceId_ = req.get_space_id(); insertable_ = req.get_insertable(); auto partId = req.get_part_id(); - auto edgeKey = std::move(req).get_edge_key(); + auto edgeKey = req.get_edge_key(); std::vector eTypes; eTypes.emplace_back(edgeKey.get_edge_type()); this->initEdgeContext(eTypes); @@ -402,7 +402,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { this->onFinished(); return; } - updateItems_ = std::move(req).get_update_items(); + updateItems_ = req.get_update_items(); VLOG(3) << "Update edge, spaceId: " << this->spaceId_ << ", partId: " << partId << ", src: " << edgeKey.get_src() << ", edge_type: " << edgeKey.get_edge_type() diff --git a/src/storage/mutate/UpdateVertexProcessor.cpp b/src/storage/mutate/UpdateVertexProcessor.cpp index 4f817454d8a..b71e4eadb7e 100644 --- a/src/storage/mutate/UpdateVertexProcessor.cpp +++ b/src/storage/mutate/UpdateVertexProcessor.cpp @@ -363,7 +363,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { return; } auto vId = req.get_vertex_id(); - updateItems_ = std::move(req).get_update_items(); + updateItems_ = req.get_update_items(); VLOG(3) << "Update vertex, spaceId: " << this->spaceId_ << ", partId: " << partId << ", vId: " << vId; diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 4b8cc7e9b79..68b34e7da11 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -16,14 +16,12 @@ #include "network/NetworkUtils.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace storage { TEST(StorageClientTest, VerticesInterfacesTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; fs::TempDir rootPath("/tmp/StorageClientTest.XXXXXX"); @@ -75,7 +73,7 @@ TEST(StorageClientTest, VerticesInterfacesTest) { ASSERT_TRUE(ret.ok()) << ret.status(); spaceId = ret.value(); LOG(INFO) << "Created space \"default\", its id is " << spaceId; - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 10); auto client = std::make_unique(threadPool, mClient.get()); diff --git a/src/storage/test/StorageHttpDownloadHandlerTest.cpp b/src/storage/test/StorageHttpDownloadHandlerTest.cpp index 710eeeb0955..607ce5ed885 100644 --- a/src/storage/test/StorageHttpDownloadHandlerTest.cpp +++ b/src/storage/test/StorageHttpDownloadHandlerTest.cpp @@ -14,7 +14,6 @@ #include "fs/TempDir.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); namespace nebula { namespace storage { diff --git a/src/storage/test/StorageHttpStatusHandlerTest.cpp b/src/storage/test/StorageHttpStatusHandlerTest.cpp index e7c9524ca4b..debddc216d9 100644 --- a/src/storage/test/StorageHttpStatusHandlerTest.cpp +++ b/src/storage/test/StorageHttpStatusHandlerTest.cpp @@ -15,7 +15,6 @@ #include "http/HttpClient.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); namespace nebula { namespace storage { diff --git a/src/tools/storage-perf/StorageIntegrityTool.cpp b/src/tools/storage-perf/StorageIntegrityTool.cpp index 7608d5b5614..6a3e658fafe 100644 --- a/src/tools/storage-perf/StorageIntegrityTool.cpp +++ b/src/tools/storage-perf/StorageIntegrityTool.cpp @@ -22,7 +22,7 @@ DEFINE_int64(first_vertex_id, 1, "The smallest vertex id"); DEFINE_uint64(width, 100, "width of matrix"); DEFINE_uint64(height, 1000, "height of matrix"); -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace storage { @@ -38,7 +38,7 @@ namespace storage { * update it after we suppport preheat. The tag must have only one int property, * which is prop_name. * 2. If the space and tag doesn't exists, it will try to create one, maybe you need to set - * load_data_interval_secs to make sure the storage service has load meta. + * heartbeat_interval_secs to make sure the storage service has load meta. * 3. The width and height is the size of the big linked list, you can refer to the graph below. * As expected, we can traverse the big linked list after width * height steps starting * from any node in the list. @@ -66,7 +66,7 @@ class IntegrityTest { private: bool init() { - FLAGS_load_data_interval_secs = 10; + FLAGS_heartbeat_interval_secs = 10; auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) { LOG(ERROR) << "Can't get metaServer address, status: " << metaAddrsRet.status() @@ -95,7 +95,7 @@ class IntegrityTest { auto tagResult = mClient_->getTagIDByNameFromCache(spaceId_, FLAGS_tag_name); if (!tagResult.ok()) { - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); LOG(ERROR) << "Get tagId failed, try to create one: " << tagResult.status(); nebula::cpp2::Schema schema; nebula::cpp2::ColumnDef column;