From 5b343c0f3790df82791c7ae94156315f0866c8b3 Mon Sep 17 00:00:00 2001 From: zhangguoqing Date: Mon, 28 Oct 2019 13:05:04 +0800 Subject: [PATCH] Improve heartbeat between metad and storaged by MetaClient 1. Add the LastUpdateTimeMan to record the latest metadata change(insert, update, delete) time on the MetaServer side. 2. Enrich the information(lastUpdateTime, partitions leaderDist) of `heartbeat` exchange between MetaServer and MetaClient at storage node. 3. It could optimize the load data logic inside meta client by lastUpdateTime, which makes metadata synchronization smarter and more timely. 4. Remove sendHeartBeat_ and load_data_interval_secs from MetaClient class, and add the bool inStoraged_ to indicate whether a metaclient is in storaged. close #1173 close #1060 close #284 --- src/daemons/MetaDaemon.cpp | 2 +- src/graph/ExecutionEngine.cpp | 6 +- src/graph/test/ConfigTest.cpp | 2 - src/graph/test/DataTest.cpp | 4 +- src/graph/test/GroupByLimitTest.cpp | 2 - src/graph/test/SchemaTest.cpp | 3 - src/graph/test/TestEnv.cpp | 6 +- src/graph/test/TraverseTestBase.h | 4 +- src/graph/test/UpdateTest.cpp | 2 - src/graph/test/UpdateTestBase.h | 4 +- src/interface/meta.thrift | 7 +- src/kvstore/NebulaStore.h | 8 +- src/kvstore/PartManager.cpp | 9 ++ src/kvstore/PartManager.h | 5 + src/meta/ActiveHostsMan.h | 65 ------------- .../{ActiveHostsMan.cpp => ActiveMan.cpp} | 45 +++++++-- src/meta/ActiveMan.h | 80 ++++++++++++++++ src/meta/CMakeLists.txt | 2 +- src/meta/MetaServiceUtils.cpp | 75 +++++++++++++-- src/meta/MetaServiceUtils.h | 20 +++- src/meta/client/MetaClient.cpp | 92 +++++++++---------- src/meta/client/MetaClient.h | 23 +++-- src/meta/processors/BaseProcessor.h | 12 ++- src/meta/processors/BaseProcessor.inl | 27 +++++- src/meta/processors/Common.h | 1 + src/meta/processors/admin/AdminClient.cpp | 2 +- src/meta/processors/admin/BalancePlan.cpp | 2 +- src/meta/processors/admin/BalanceProcessor.h | 2 +- src/meta/processors/admin/BalanceTask.cpp | 4 + src/meta/processors/admin/BalanceTask.h | 2 +- src/meta/processors/admin/Balancer.cpp | 1 - src/meta/processors/admin/HBProcessor.cpp | 28 +++++- .../processors/customKV/MultiPutProcessor.cpp | 2 +- .../processors/customKV/RemoveProcessor.cpp | 2 +- .../customKV/RemoveRangeProcessor.cpp | 2 +- .../partsMan/CreateSpaceProcessor.cpp | 2 +- .../partsMan/ListHostsProcessor.cpp | 32 +++---- .../partsMan/ListPartsProcessor.cpp | 2 +- ...tiveHostsManTest.cpp => ActiveManTest.cpp} | 42 +++++++-- src/meta/test/BalanceIntegrationTest.cpp | 43 +++++---- src/meta/test/CMakeLists.txt | 4 +- src/meta/test/ConfigManTest.cpp | 39 ++++---- src/meta/test/HBProcessorTest.cpp | 29 ++++++ src/meta/test/MetaClientTest.cpp | 47 ++++++---- src/meta/test/MetaHttpDownloadHandlerTest.cpp | 1 - src/meta/test/MetaHttpStatusHandlerTest.cpp | 1 - src/meta/test/MetaServiceUtilsTest.cpp | 33 ++++++- src/meta/test/TestUtils.h | 4 +- src/storage/UpdateEdgeProcessor.cpp | 4 +- src/storage/UpdateVertexProcessor.cpp | 2 +- src/storage/test/StorageClientTest.cpp | 4 +- .../test/StorageHttpDownloadHandlerTest.cpp | 1 - .../test/StorageHttpStatusHandlerTest.cpp | 1 - .../storage-perf/StorageIntegrityTool.cpp | 8 +- 54 files changed, 567 insertions(+), 285 deletions(-) delete mode 100644 src/meta/ActiveHostsMan.h rename src/meta/{ActiveHostsMan.cpp => ActiveMan.cpp} (57%) create mode 100644 src/meta/ActiveMan.h rename src/meta/test/{ActiveHostsManTest.cpp => ActiveManTest.cpp} (53%) diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 43e2f363211..15c95fa6c42 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -20,7 +20,7 @@ #include "kvstore/PartManager.h" #include "meta/ClusterIdMan.h" #include "kvstore/NebulaStore.h" -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include "meta/KVBasedGflagsManager.h" using nebula::operator<<; diff --git a/src/graph/ExecutionEngine.cpp b/src/graph/ExecutionEngine.cpp index 9af8ca8ce5e..fcad01cb9cc 100644 --- a/src/graph/ExecutionEngine.cpp +++ b/src/graph/ExecutionEngine.cpp @@ -28,7 +28,11 @@ Status ExecutionEngine::init(std::shared_ptr ioExec if (!addrs.ok()) { return addrs.status(); } - metaClient_ = std::make_unique(ioExecutor, std::move(addrs.value())); + metaClient_ = std::make_unique(ioExecutor, + std::move(addrs.value()), + HostAddr(0, 0), + 0, + false); // load data try 3 time bool loadDataOk = metaClient_->waitForMetadReady(3); if (!loadDataOk) { diff --git a/src/graph/test/ConfigTest.cpp b/src/graph/test/ConfigTest.cpp index 20bc3bf85be..4eb93e14b0a 100644 --- a/src/graph/test/ConfigTest.cpp +++ b/src/graph/test/ConfigTest.cpp @@ -12,8 +12,6 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/DataTest.cpp b/src/graph/test/DataTest.cpp index 415ea0fd45e..44109eca117 100644 --- a/src/graph/test/DataTest.cpp +++ b/src/graph/test/DataTest.cpp @@ -9,7 +9,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -134,7 +134,7 @@ AssertionResult DataTest::prepareSchema() { << " failed, error code "<< static_cast(code); } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } diff --git a/src/graph/test/GroupByLimitTest.cpp b/src/graph/test/GroupByLimitTest.cpp index 54eadd45709..2c630c16029 100644 --- a/src/graph/test/GroupByLimitTest.cpp +++ b/src/graph/test/GroupByLimitTest.cpp @@ -10,8 +10,6 @@ #include "graph/test/TraverseTestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index 14f71751827..8bd0137a978 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -10,8 +10,6 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { @@ -686,7 +684,6 @@ TEST_F(SchemaTest, metaCommunication) { ASSERT_EQ(1, (*(resp.get_rows())).size()); } - sleep(FLAGS_load_data_interval_secs + 1); int retry = 60; while (retry-- > 0) { auto spaceResult = gEnv->metaClient()->getSpaceIdByNameFromCache("default_space"); diff --git a/src/graph/test/TestEnv.cpp b/src/graph/test/TestEnv.cpp index 118affc1010..e591bf995b2 100644 --- a/src/graph/test/TestEnv.cpp +++ b/src/graph/test/TestEnv.cpp @@ -9,7 +9,7 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); DECLARE_string(meta_server_addrs); namespace nebula { @@ -27,7 +27,7 @@ TestEnv::~TestEnv() { void TestEnv::SetUp() { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; // Create metaServer metaServer_ = nebula::meta::TestUtils::mockMetaServer( @@ -72,7 +72,7 @@ void TestEnv::SetUp() { void TestEnv::TearDown() { // TO make sure the drop space be invoked on storage server - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); graphServer_.reset(); storageServer_.reset(); mClient_.reset(); diff --git a/src/graph/test/TraverseTestBase.h b/src/graph/test/TraverseTestBase.h index f1d2cb80285..017bc52979e 100644 --- a/src/graph/test/TraverseTestBase.h +++ b/src/graph/test/TraverseTestBase.h @@ -12,7 +12,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -403,7 +403,7 @@ AssertionResult TraverseTestBase::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed"; } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } diff --git a/src/graph/test/UpdateTest.cpp b/src/graph/test/UpdateTest.cpp index cb7f223d699..9d2f11fb80c 100644 --- a/src/graph/test/UpdateTest.cpp +++ b/src/graph/test/UpdateTest.cpp @@ -10,8 +10,6 @@ #include "graph/test/UpdateTestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); - namespace nebula { namespace graph { diff --git a/src/graph/test/UpdateTestBase.h b/src/graph/test/UpdateTestBase.h index 7eefc7aed06..d5e92120a57 100644 --- a/src/graph/test/UpdateTestBase.h +++ b/src/graph/test/UpdateTestBase.h @@ -12,7 +12,7 @@ #include "graph/test/TestBase.h" #include "meta/test/TestUtils.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace graph { @@ -118,7 +118,7 @@ AssertionResult UpdateTestBase::prepareSchema() { return TestError() << "Do cmd:" << cmd << " failed"; } } - sleep(FLAGS_load_data_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + 3); return TestOK(); } diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 8b0feadac1e..82d600ad93d 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -357,11 +357,14 @@ struct HBResp { 1: ErrorCode code, 2: common.HostAddr leader, 3: common.ClusterID cluster_id, + 4: i64 last_update_time_in_ms, } struct HBReq { - 1: common.HostAddr host, - 2: common.ClusterID cluster_id, + 1: bool in_storaged, + 2: common.HostAddr host, + 3: common.ClusterID cluster_id, + 4: optional map> (cpp.template = "std::unordered_map") leader_parts; } struct CreateUserReq { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index f656e79c5ab..d30a24a88f9 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -161,11 +161,10 @@ class NebulaStore : public KVStore, public Handler { ResultCode flush(GraphSpaceID spaceId) override; - int32_t allLeader(std::unordered_map>& leaderIds) override; - bool isLeader(GraphSpaceID spaceId, PartitionID partId); + ErrorOr> space(GraphSpaceID spaceId); + /** * Implement four interfaces in Handler. * */ @@ -177,7 +176,8 @@ class NebulaStore : public KVStore, public Handler { void removePart(GraphSpaceID spaceId, PartitionID partId) override; - ErrorOr> space(GraphSpaceID spaceId); + int32_t allLeader(std::unordered_map>& leaderIds) override; private: void updateSpaceOption(GraphSpaceID spaceId, diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index d0ddd73c7bf..10394cc3b96 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -155,5 +155,14 @@ void MetaServerBasedPartManager::onPartUpdated(const PartMeta& partMeta) { UNUSED(partMeta); } +void MetaServerBasedPartManager::fetchLeaderInfo( + std::unordered_map>& leaderIds) { + if (handler_ != nullptr) { + handler_->allLeader(leaderIds); + } else { + VLOG(1) << "handler_ is nullptr!"; + } +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 6480e9be7a9..551c537462e 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -24,6 +24,8 @@ class Handler { bool isDbOption) = 0; virtual void removeSpace(GraphSpaceID spaceId) = 0; virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0; + virtual int32_t allLeader(std::unordered_map>& leaderIds) = 0; }; @@ -164,6 +166,9 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL void onPartUpdated(const PartMeta& partMeta) override; + void fetchLeaderInfo(std::unordered_map>& leaderIds) override; + HostAddr getLocalHost() { return localHost_; } diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h deleted file mode 100644 index 8f3a06198bd..00000000000 --- a/src/meta/ActiveHostsMan.h +++ /dev/null @@ -1,65 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -#ifndef META_ACTIVEHOSTSMAN_H_ -#define META_ACTIVEHOSTSMAN_H_ - -#include "base/Base.h" -#include -#include "kvstore/KVStore.h" - -namespace nebula { -namespace meta { - -struct HostInfo { - HostInfo() = default; - explicit HostInfo(int64_t lastHBTimeInMilliSec) - : lastHBTimeInMilliSec_(lastHBTimeInMilliSec) {} - - bool operator==(const HostInfo& that) const { - return this->lastHBTimeInMilliSec_ == that.lastHBTimeInMilliSec_; - } - - bool operator!=(const HostInfo& that) const { - return !(*this == that); - } - - int64_t lastHBTimeInMilliSec_ = 0; - - static std::string encode(const HostInfo& info) { - std::string encode; - encode.reserve(sizeof(int64_t)); - encode.append(reinterpret_cast(&info.lastHBTimeInMilliSec_), sizeof(int64_t)); - return encode; - } - - static HostInfo decode(const folly::StringPiece& data) { - HostInfo info; - info.lastHBTimeInMilliSec_ = *reinterpret_cast(data.data()); - return info; - } -}; - -class ActiveHostsMan final { -public: - ~ActiveHostsMan() = default; - - static kvstore::ResultCode updateHostInfo(kvstore::KVStore* kv, - const HostAddr& hostAddr, - const HostInfo& info); - - static std::vector getActiveHosts(kvstore::KVStore* kv, int32_t expiredTTL = 0); - - static bool isLived(kvstore::KVStore* kv, const HostAddr& host); - -protected: - ActiveHostsMan() = default; -}; - -} // namespace meta -} // namespace nebula - -#endif // META_ACTIVEHOSTSMAN_H_ diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveMan.cpp similarity index 57% rename from src/meta/ActiveHostsMan.cpp rename to src/meta/ActiveMan.cpp index 7021f6ce42f..d4e1107f20e 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveMan.cpp @@ -4,7 +4,7 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include "meta/MetaServiceUtils.h" #include "meta/processors/Common.h" @@ -14,13 +14,42 @@ DEFINE_int32(expired_threshold_sec, 10 * 60, namespace nebula { namespace meta { +kvstore::ResultCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const TimeInfo& info) { + CHECK_NOTNULL(kv); + std::vector data; + data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(), TimeInfo::encode(info)); + folly::SharedMutex::WriteHolder wHolder(LockUtils::lastUpdateTimeLock()); + folly::Baton baton; + kvstore::ResultCode ret; + kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), + [&] (kvstore::ResultCode code) { + ret = code; + baton.post(); + }); + baton.wait(); + return ret; +} + +int64_t LastUpdateTimeMan::get(kvstore::KVStore* kv) { + CHECK_NOTNULL(kv); + auto key = MetaServiceUtils::lastUpdateTimeKey(); + std::string val; + auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val); + if (ret == kvstore::ResultCode::SUCCEEDED) { + TimeInfo info = TimeInfo::decode(val); + return info.lastTime_; + } + return 0; +} + + kvstore::ResultCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv, const HostAddr& hostAddr, - const HostInfo& info) { + const TimeInfo& info) { CHECK_NOTNULL(kv); std::vector data; data.emplace_back(MetaServiceUtils::hostKey(hostAddr.first, hostAddr.second), - HostInfo::encode(info)); + TimeInfo::encode(info)); folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); folly::Baton baton; kvstore::ResultCode ret; @@ -44,10 +73,12 @@ std::vector ActiveHostsMan::getActiveHosts(kvstore::KVStore* kv, int32 int64_t threshold = (expiredTTL == 0 ? FLAGS_expired_threshold_sec : expiredTTL) * 1000; auto now = time::WallClock::fastNowInMilliSec(); while (iter->valid()) { - auto host = MetaServiceUtils::parseHostKey(iter->key()); - HostInfo info = HostInfo::decode(iter->val()); - if (now - info.lastHBTimeInMilliSec_ < threshold) { - hosts.emplace_back(host.ip, host.port); + if (!MetaServiceUtils::isHostKeyWithSpaceId(iter->key())) { + auto host = MetaServiceUtils::parseHostKeyAddr(iter->key()); + TimeInfo info = TimeInfo::decode(iter->val()); + if (now - info.lastTime_ < threshold) { + hosts.emplace_back(host.ip, host.port); + } } iter->next(); } diff --git a/src/meta/ActiveMan.h b/src/meta/ActiveMan.h new file mode 100644 index 00000000000..38f8977e83d --- /dev/null +++ b/src/meta/ActiveMan.h @@ -0,0 +1,80 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_ACTIVEMAN_H_ +#define META_ACTIVEMAN_H_ + +#include "base/Base.h" +#include +#include "kvstore/KVStore.h" +#include "meta/MetaServiceUtils.h" + +namespace nebula { +namespace meta { + +struct TimeInfo { + TimeInfo() = default; + explicit TimeInfo(int64_t lastTime) + : lastTime_(lastTime) {} + + bool operator==(const TimeInfo& that) const { + return this->lastTime_ == that.lastTime_; + } + + bool operator!=(const TimeInfo& that) const { + return !(*this == that); + } + + int64_t lastTime_ = 0; + + static std::string encode(const TimeInfo& info) { + std::string encode; + encode.reserve(sizeof(int64_t)); + encode.append(reinterpret_cast(&info.lastTime_), sizeof(int64_t)); + return encode; + } + + static TimeInfo decode(const folly::StringPiece& data) { + TimeInfo info; + info.lastTime_ = *reinterpret_cast(data.data()); + return info; + } +}; + + +class LastUpdateTimeMan final { +public: + ~LastUpdateTimeMan() = default; + + static kvstore::ResultCode update(kvstore::KVStore* kv, const TimeInfo& info); + + static int64_t get(kvstore::KVStore* kv); + +protected: + LastUpdateTimeMan() = default; +}; + + +class ActiveHostsMan final { +public: + ~ActiveHostsMan() = default; + + static kvstore::ResultCode updateHostInfo(kvstore::KVStore* kv, + const HostAddr& hostAddr, + const TimeInfo& info); + + static std::vector getActiveHosts(kvstore::KVStore* kv, int32_t expiredTTL = 0); + + static bool isLived(kvstore::KVStore* kv, const HostAddr& host); + +protected: + ActiveHostsMan() = default; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_ACTIVEMAN_H_ diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 747e769fd50..b603a5a02ac 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -9,7 +9,7 @@ nebula_add_library( meta_service_handler OBJECT MetaServiceHandler.cpp MetaServiceUtils.cpp - ActiveHostsMan.cpp + ActiveMan.cpp processors/partsMan/ListHostsProcessor.cpp processors/partsMan/ListPartsProcessor.cpp processors/partsMan/CreateSpaceProcessor.cpp diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index 140ea73014f..5bd1f1f528e 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -20,11 +20,18 @@ const std::string kIndexTable = "__index__"; // NOLINT const std::string kUsersTable = "__users__"; // NOLINT const std::string kRolesTable = "__roles__"; // NOLINT const std::string kConfigsTable = "__configs__"; // NOLINT - +const std::string kLastUpdateTimeTable = "__last_update_time__"; // NOLINT const std::string kHostOnline = "Online"; // NOLINT const std::string kHostOffline = "Offline"; // NOLINT +std::string MetaServiceUtils::lastUpdateTimeKey() { + std::string key; + key.reserve(128); + key.append(kLastUpdateTimeTable.data(), kLastUpdateTimeTable.size()); + return key; +} + std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) { std::string key; key.reserve(128); @@ -39,7 +46,7 @@ std::string MetaServiceUtils::spaceVal(const cpp2::SpaceProperties &properties) return val; } -cpp2::SpaceProperties MetaServiceUtils::parseSpace(folly::StringPiece rawData) { +cpp2::SpaceProperties MetaServiceUtils::parseSpace(folly::StringPiece rawData) { cpp2::SpaceProperties properties; apache::thrift::CompactSerializer::deserialize(rawData, properties); return properties; @@ -66,6 +73,16 @@ std::string MetaServiceUtils::partKey(GraphSpaceID spaceId, PartitionID partId) return key; } +GraphSpaceID MetaServiceUtils::parsePartKeySpaceId(folly::StringPiece key) { + return *reinterpret_cast(key.data() + kPartsTable.size()); +} + +PartitionID MetaServiceUtils::parsePartKeyPartId(folly::StringPiece key) { + return *reinterpret_cast(key.data() + + kPartsTable.size() + + sizeof(GraphSpaceID)); +} + std::string MetaServiceUtils::partVal(const std::vector& hosts) { std::string val; val.reserve(128); @@ -110,6 +127,34 @@ std::string MetaServiceUtils::hostKey(IPv4 ip, Port port) { return key; } +std::string MetaServiceUtils::hostKey(IPv4 ip, Port port, GraphSpaceID spaceId) { + std::string key = MetaServiceUtils::hostKey(ip, port); + key.append(reinterpret_cast(&spaceId), sizeof(GraphSpaceID)); + return key; +} + +const std::string& MetaServiceUtils::hostPrefix() { + return kHostsTable; +} + +bool MetaServiceUtils::isHostKeyWithSpaceId(const folly::StringPiece& key) { + if (key.size() == kHostsTable.size() + sizeof(IPv4) + sizeof(Port) + sizeof(GraphSpaceID)) { + return true; + } + return false; +} + +nebula::cpp2::HostAddr MetaServiceUtils::parseHostKeyAddr(folly::StringPiece key) { + nebula::cpp2::HostAddr host; + memcpy(&host, key.data() + kHostsTable.size(), sizeof(host)); + return host; +} + +GraphSpaceID MetaServiceUtils::parseHostKeySpaceId(folly::StringPiece key) { + auto offset = kHostsTable.size() + sizeof(IPv4) + sizeof(Port); + return *reinterpret_cast(key.data() + offset); +} + std::string MetaServiceUtils::hostValOnline() { return kHostOnline; } @@ -118,14 +163,28 @@ std::string MetaServiceUtils::hostValOffline() { return kHostOffline; } -const std::string& MetaServiceUtils::hostPrefix() { - return kHostsTable; +std::string MetaServiceUtils::hostVal(const std::vector& partIds) { + std::string val; + val.reserve(128); + for (auto& partId : partIds) { + val.append(reinterpret_cast(&partId), sizeof(PartitionID)); + } + return val; } -nebula::cpp2::HostAddr MetaServiceUtils::parseHostKey(folly::StringPiece key) { - nebula::cpp2::HostAddr host; - memcpy(&host, key.data() + kHostsTable.size(), sizeof(host)); - return host; +std::vector MetaServiceUtils::parseHostVal(folly::StringPiece val) { + std::vector partIds; + static const size_t unitSize = sizeof(PartitionID); + auto partsNum = val.size() / unitSize; + partIds.reserve(partsNum); + VLOG(3) << "Total size:" << val.size() + << ", part size:" << unitSize + << ", parts num:" << partsNum; + for (decltype(partsNum) i = 0; i < partsNum; i++) { + PartitionID partId = *reinterpret_cast(val.data() + i * unitSize); + partIds.emplace_back(partId); + } + return partIds; } std::string MetaServiceUtils::schemaEdgePrefix(GraphSpaceID spaceId, EdgeType edgeType) { diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index deb7ae2bc38..ec2447ce5db 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -28,6 +28,8 @@ class MetaServiceUtils final { public: MetaServiceUtils() = delete; + static std::string lastUpdateTimeKey(); + static std::string spaceKey(GraphSpaceID spaceId); static std::string spaceVal(const cpp2::SpaceProperties &properties); @@ -42,6 +44,10 @@ class MetaServiceUtils final { static std::string partKey(GraphSpaceID spaceId, PartitionID partId); + static GraphSpaceID parsePartKeySpaceId(folly::StringPiece key); + + static PartitionID parsePartKeyPartId(folly::StringPiece key); + static std::string partVal(const std::vector& hosts); static const std::string& partPrefix(); @@ -52,13 +58,23 @@ class MetaServiceUtils final { static std::string hostKey(IPv4 ip, Port port); + static std::string hostKey(IPv4 ip, Port port, GraphSpaceID spaceId); + + static const std::string& hostPrefix(); + + static bool isHostKeyWithSpaceId(const folly::StringPiece& key); + + static nebula::cpp2::HostAddr parseHostKeyAddr(folly::StringPiece key); + + static GraphSpaceID parseHostKeySpaceId(folly::StringPiece key); + static std::string hostValOnline(); static std::string hostValOffline(); - static const std::string& hostPrefix(); + static std::string hostVal(const std::vector& partIds); - static nebula::cpp2::HostAddr parseHostKey(folly::StringPiece key); + static std::vector parseHostVal(folly::StringPiece val); static std::string schemaEdgePrefix(GraphSpaceID spaceId, EdgeType edgeType); diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 4540db2aee1..ef993e35e19 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -12,7 +12,6 @@ #include "meta/GflagsManager.h" #include "base/Configuration.h" -DEFINE_int32(load_data_interval_secs, 1, "Load data interval"); DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval"); DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry"); DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry"); @@ -26,12 +25,12 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool std::vector addrs, HostAddr localHost, ClusterID clusterId, - bool sendHeartBeat) + bool inStoraged) : ioThreadPool_(ioThreadPool) , addrs_(std::move(addrs)) , localHost_(localHost) , clusterId_(clusterId) - , sendHeartBeat_(sendHeartBeat) { + , inStoraged_(inStoraged) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) << "No meta server address is specified. Meta server is required"; @@ -52,16 +51,16 @@ MetaClient::~MetaClient() { bool MetaClient::isMetadReady() { - if (sendHeartBeat_) { - auto ret = heartbeat().get(); - if (!ret.ok() && ret.status() != Status::LeaderChanged()) { - LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); - ready_ = false; - return ready_; - } - } // end if + auto ret = heartbeat().get(); + if (!ret.ok() && ret.status() != Status::LeaderChanged()) { + LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); + ready_ = false; + return ready_; + } + loadData(); loadCfg(); + localLastUpdateTime_ = metadLastUpdateTime_; return ready_; } @@ -83,15 +82,11 @@ bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) { } CHECK(bgThread_->start()); - if (sendHeartBeat_) { - LOG(INFO) << "Register time task for heartbeat!"; - size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addTimerTask(delayMS, - FLAGS_heartbeat_interval_secs * 1000, - &MetaClient::heartBeatThreadFunc, this); - } - addLoadDataTask(); - addLoadCfgTask(); + LOG(INFO) << "Register time task for heartbeat!"; + size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900); + bgThread_->addTimerTask(delayMS, + FLAGS_heartbeat_interval_secs * 1000, + &MetaClient::heartBeatThreadFunc, this); return ready_; } @@ -110,11 +105,15 @@ void MetaClient::heartBeatThreadFunc() { LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); return; } -} -void MetaClient::loadDataThreadFunc() { - loadData(); - addLoadDataTask(); + // if MetaServer has some changes, refesh the localCache_ + if (localLastUpdateTime_ < metadLastUpdateTime_) { + bool ldRet = loadData(); + bool lcRet = loadCfg(); + if (ldRet && lcRet) { + localLastUpdateTime_ = metadLastUpdateTime_; + } + } } bool MetaClient::loadData() { @@ -187,12 +186,6 @@ bool MetaClient::loadData() { return true; } -void MetaClient::addLoadDataTask() { - size_t delayMS = FLAGS_load_data_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addDelayTask(delayMS, &MetaClient::loadDataThreadFunc, this); -} - - bool MetaClient::loadSchemas(GraphSpaceID spaceId, std::shared_ptr spaceInfoCache, SpaceTagNameIdMap &tagNameIdMap, @@ -1110,7 +1103,6 @@ StatusOr MetaClient::getNewestTagVerFromCache(const GraphSpaceID& spa return it->second; } - StatusOr MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& space, const EdgeType& edgeType) { if (!ready_) { @@ -1124,17 +1116,31 @@ StatusOr MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& sp return it->second; } - folly::Future> MetaClient::heartbeat() { if (clusterId_.load() == 0) { clusterId_ = ClusterIdMan::getClusterIdFromFile(FLAGS_cluster_id_path); } cpp2::HBReq req; + req.set_in_storaged(inStoraged_); nebula::cpp2::HostAddr thriftHost; thriftHost.set_ip(localHost_.first); thriftHost.set_port(localHost_.second); req.set_host(std::move(thriftHost)); req.set_cluster_id(clusterId_.load()); + + if (inStoraged_ && listener_ != nullptr) { + LeaderIds leaderIds; + listener_->fetchLeaderInfo(leaderIds); + if (leaderIds_ != leaderIds) { + { + folly::RWSpinLock::WriteHolder holder(leaderIdsLock_); + leaderIds_.clear(); + leaderIds_ = leaderIds; + } + req.set_leader_parts(std::move(leaderIds)); + } + } + folly::Promise> promise; auto future = promise.getFuture(); LOG(INFO) << "Send heartbeat to " << leader_ << ", clusterId " << req.get_cluster_id(); @@ -1151,7 +1157,9 @@ folly::Future> MetaClient::heartbeat() { << FLAGS_cluster_id_path; } } - return true; + metadLastUpdateTime_ = resp.get_last_update_time_in_ms(); + LOG(INFO) << "Metad last update time: " << metadLastUpdateTime_; + return true; // resp.code == cpp2::ErrorCode::SUCCEEDED }, std::move(promise), true); return future; } @@ -1273,11 +1281,6 @@ MetaClient::listConfigs(const cpp2::ConfigModule& module) { return future; } -void MetaClient::loadCfgThreadFunc() { - loadCfg(); - addLoadCfgTask(); -} - bool MetaClient::registerCfg() { auto ret = regConfig(gflagsDeclared_).get(); if (ret.ok()) { @@ -1287,9 +1290,9 @@ bool MetaClient::registerCfg() { return configReady_; } -void MetaClient::loadCfg() { +bool MetaClient::loadCfg() { if (!configReady_ && !registerCfg()) { - return; + return false; } // only load current module's config is enough auto ret = listConfigs(gflagsModule_).get(); @@ -1319,14 +1322,9 @@ void MetaClient::loadCfg() { } } else { LOG(INFO) << "Load configs failed: " << ret.status(); - return; + return false; } -} - -void MetaClient::addLoadCfgTask() { - size_t delayMS = FLAGS_load_data_interval_secs * 1000 + folly::Random::rand32(900); - bgThread_->addDelayTask(delayMS, &MetaClient::loadCfgThreadFunc, this); - LOG(INFO) << "Load configs completed, call after " << delayMS << " ms"; + return true; } void MetaClient::updateGflagsValue(const ConfigItem& item) { diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 4dcc8c461aa..2ff903969bb 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -44,6 +44,8 @@ struct SpaceInfoCache { using LocalCache = std::unordered_map>; +using LeaderIds = std::unordered_map>; + using SpaceNameIdMap = std::unordered_map; // get tagID via spaceId and tagName using SpaceTagNameIdMap = std::unordered_map, TagID>; @@ -91,6 +93,8 @@ class MetaChangedListener { virtual void onPartAdded(const PartMeta& partMeta) = 0; virtual void onPartRemoved(GraphSpaceID spaceId, PartitionID partId) = 0; virtual void onPartUpdated(const PartMeta& partMeta) = 0; + virtual void fetchLeaderInfo(std::unordered_map>& leaderIds) = 0; }; class MetaClient { @@ -108,7 +112,7 @@ class MetaClient { std::vector addrs, HostAddr localHost = HostAddr(0, 0), ClusterID clusterId = 0, - bool sendHeartBeat = false); + bool inStoraged = true); virtual ~MetaClient(); @@ -277,18 +281,12 @@ class MetaClient { Status refreshCache(); protected: - void loadDataThreadFunc(); // Return true if load succeeded. bool loadData(); - - void addLoadDataTask(); - + bool loadCfg(); void heartBeatThreadFunc(); - void loadCfgThreadFunc(); - void loadCfg(); bool registerCfg(); - void addLoadCfgTask(); void updateGflagsValue(const ConfigItem& item); void updateNestedGflags(const std::string& name); @@ -359,6 +357,11 @@ class MetaClient { HostAddr leader_; HostAddr localHost_; + int64_t localLastUpdateTime_{0}; + int64_t metadLastUpdateTime_{0}; + LeaderIds leaderIds_; + folly::RWSpinLock leaderIdsLock_; + std::unique_ptr bgThread_; SpaceNameIdMap spaceIndexByName_; SpaceTagNameIdMap spaceTagIndexByName_; @@ -366,13 +369,13 @@ class MetaClient { SpaceEdgeTypeNameMap spaceEdgeIndexByType_; SpaceNewestTagVerMap spaceNewestTagVerMap_; SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_; - SpaceAllEdgeMap spaceAllEdgeMap_; + SpaceAllEdgeMap spaceAllEdgeMap_; folly::RWSpinLock localCacheLock_; MetaChangedListener* listener_{nullptr}; folly::RWSpinLock listenerLock_; std::atomic clusterId_{0}; bool isRunning_{false}; - bool sendHeartBeat_{false}; + bool inStoraged_{true}; std::atomic_bool ready_{false}; MetaConfigMap metaConfigMap_; folly::RWSpinLock configCacheLock_; diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index 2bed2a7b86a..cdd5b1244d5 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -13,11 +13,12 @@ #include #include "base/StatusOr.h" #include "time/Duration.h" +#include "network/NetworkUtils.h" #include "kvstore/KVStore.h" #include "meta/MetaServiceUtils.h" #include "meta/common/MetaCommon.h" -#include "network/NetworkUtils.h" #include "meta/processors/Common.h" +#include "meta/ActiveMan.h" namespace nebula { namespace meta { @@ -129,7 +130,7 @@ class BaseProcessor { /** * General put function. * */ - void doPut(std::vector data); + void doPut(std::vector data, bool refresh = true); StatusOr> doPrefix(const std::string& key); @@ -146,13 +147,14 @@ class BaseProcessor { /** * General remove function. * */ - void doRemove(const std::string& key); + void doRemove(const std::string& key, bool refresh = true); /** * Remove keys from start to end, doesn't contain end. * */ void doRemoveRange(const std::string& start, - const std::string& end); + const std::string& end, + bool refresh = true); /** * Scan keys from start to end, doesn't contain end. @@ -162,7 +164,7 @@ class BaseProcessor { /** * General multi remove function. **/ - void doMultiRemove(std::vector keys); + void doMultiRemove(std::vector keys, bool refresh = true); /** * Get all hosts diff --git a/src/meta/processors/BaseProcessor.inl b/src/meta/processors/BaseProcessor.inl index e8cec4e6882..b99d4b976ce 100644 --- a/src/meta/processors/BaseProcessor.inl +++ b/src/meta/processors/BaseProcessor.inl @@ -11,7 +11,13 @@ namespace nebula { namespace meta { template -void BaseProcessor::doPut(std::vector data) { +void BaseProcessor::doPut(std::vector data, bool refresh) { + if (refresh) { + int64_t now = time::WallClock::fastNowInMilliSec(); + std::string lastUpdateTime = TimeInfo::encode(TimeInfo(now)); + data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(), lastUpdateTime); + } + kvstore_->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data), @@ -62,7 +68,11 @@ BaseProcessor::doMultiGet(const std::vector& keys) { template -void BaseProcessor::doRemove(const std::string& key) { +void BaseProcessor::doRemove(const std::string& key, bool refresh) { + if (refresh) { + LastUpdateTimeMan::update(kvstore_, TimeInfo(time::WallClock::fastNowInMilliSec())); + } + kvstore_->asyncRemove(kDefaultSpaceId, kDefaultPartId, key, @@ -74,7 +84,11 @@ void BaseProcessor::doRemove(const std::string& key) { template -void BaseProcessor::doMultiRemove(std::vector keys) { +void BaseProcessor::doMultiRemove(std::vector keys, bool refresh) { + if (refresh) { + LastUpdateTimeMan::update(kvstore_, TimeInfo(time::WallClock::fastNowInMilliSec())); + } + kvstore_->asyncMultiRemove(kDefaultSpaceId, kDefaultPartId, std::move(keys), @@ -87,7 +101,12 @@ void BaseProcessor::doMultiRemove(std::vector keys) { template void BaseProcessor::doRemoveRange(const std::string& start, - const std::string& end) { + const std::string& end, + bool refresh) { + if (refresh) { + LastUpdateTimeMan::update(kvstore_, TimeInfo(time::WallClock::fastNowInMilliSec())); + } + kvstore_->asyncRemoveRange(kDefaultSpaceId, kDefaultPartId, start, diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h index 4018252cd58..43a645d47dd 100644 --- a/src/meta/processors/Common.h +++ b/src/meta/processors/Common.h @@ -26,6 +26,7 @@ class LockUtils { return l; \ } +GENERATE_LOCK(lastUpdateTime); GENERATE_LOCK(space); GENERATE_LOCK(id); GENERATE_LOCK(tag); diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index 60dde4ccb5d..eefad44fb72 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -7,7 +7,7 @@ #include "meta/processors/admin/AdminClient.h" #include "meta/MetaServiceUtils.h" #include "meta/processors/Common.h" -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include "kvstore/Part.h" DEFINE_int32(max_retry_times_admin_op, 30, "max retry times for admin request!"); diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp index 3eef9840fd5..bcb133ea65a 100644 --- a/src/meta/processors/admin/BalancePlan.cpp +++ b/src/meta/processors/admin/BalancePlan.cpp @@ -7,7 +7,7 @@ #include "meta/processors/admin/BalancePlan.h" #include #include "meta/processors/Common.h" -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" DEFINE_uint32(task_concurrency, 10, "The tasks number could be invoked simultaneously"); diff --git a/src/meta/processors/admin/BalanceProcessor.h b/src/meta/processors/admin/BalanceProcessor.h index 82fb712b89b..b75dbd27ee8 100644 --- a/src/meta/processors/admin/BalanceProcessor.h +++ b/src/meta/processors/admin/BalanceProcessor.h @@ -9,7 +9,7 @@ #include #include "meta/processors/BaseProcessor.h" -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" namespace nebula { namespace meta { diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp index 8b2d2572fa2..2208f53c54e 100644 --- a/src/meta/processors/admin/BalanceTask.cpp +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -149,6 +149,10 @@ void BalanceTask::invoke() { ret_ = Result::FAILED; } else { status_ = Status::REMOVE_PART_ON_SRC; + if (kv_ != nullptr) { + auto now = time::WallClock::fastNowInMilliSec(); + LastUpdateTimeMan::update(kv_, TimeInfo(now)); + } } invoke(); }); diff --git a/src/meta/processors/admin/BalanceTask.h b/src/meta/processors/admin/BalanceTask.h index 989534071f0..6e98a97be97 100644 --- a/src/meta/processors/admin/BalanceTask.h +++ b/src/meta/processors/admin/BalanceTask.h @@ -8,7 +8,7 @@ #define META_ADMIN_BALANCETASK_H_ #include -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include "time/WallClock.h" #include "kvstore/KVStore.h" #include "network/NetworkUtils.h" diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index dc35518618f..f896a19fc66 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -8,7 +8,6 @@ #include #include #include "meta/processors/Common.h" -#include "meta/ActiveHostsMan.h" #include "meta/MetaServiceUtils.h" #include "network/NetworkUtils.h" diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index a4db3ffebb8..f6d3cd68efc 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -7,7 +7,7 @@ #include "meta/processors/admin/HBProcessor.h" #include "time/WallClock.h" -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include "meta/ClusterIdMan.h" DEFINE_bool(hosts_whitelist_enabled, false, "Check host whether in whitelist when received hb"); @@ -39,8 +39,20 @@ void HBProcessor::process(const cpp2::HBReq& req) { } LOG(INFO) << "Receive heartbeat from " << host; - HostInfo info(time::WallClock::fastNowInMilliSec()); - auto ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info); + std::vector data; + auto ret = kvstore::ResultCode::SUCCEEDED; + if (req.get_in_storaged()) { + TimeInfo hbInfo(time::WallClock::fastNowInMilliSec()); + ret = ActiveHostsMan::updateHostInfo(this->kvstore_, host, hbInfo); + + if (req.__isset.leader_parts) { + const auto& leaderIds = req.get_leader_parts(); + std::for_each(leaderIds->begin(), leaderIds->end(), [&] (auto& lp) { + data.emplace_back(MetaServiceUtils::hostKey(req.host.ip, req.host.port, lp.first), + MetaServiceUtils::hostVal(lp.second)); + }); + } + } resp_.set_code(to(ret)); if (ret == kvstore::ResultCode::ERR_LEADER_CHANGED) { auto leaderRet = kvstore_->partLeader(kDefaultSpaceId, kDefaultPartId); @@ -48,7 +60,15 @@ void HBProcessor::process(const cpp2::HBReq& req) { resp_.set_leader(toThriftHost(nebula::value(leaderRet))); } } - onFinished(); + + int64_t lastUpdateTime = LastUpdateTimeMan::get(this->kvstore_); + resp_.set_last_update_time_in_ms(lastUpdateTime); + + if (!data.empty()) { + doPut(std::move(data), false); + } else { + onFinished(); + } } } // namespace meta diff --git a/src/meta/processors/customKV/MultiPutProcessor.cpp b/src/meta/processors/customKV/MultiPutProcessor.cpp index 6ab93ac6bc7..0bcedda101b 100644 --- a/src/meta/processors/customKV/MultiPutProcessor.cpp +++ b/src/meta/processors/customKV/MultiPutProcessor.cpp @@ -16,7 +16,7 @@ void MultiPutProcessor::process(const cpp2::MultiPutReq& req) { data.emplace_back(MetaServiceUtils::assembleSegmentKey(req.get_segment(), pair.get_key()), pair.get_value()); } - doPut(std::move(data)); + doPut(std::move(data), false); } } // namespace meta diff --git a/src/meta/processors/customKV/RemoveProcessor.cpp b/src/meta/processors/customKV/RemoveProcessor.cpp index a19e6540efa..7e157ee23ab 100644 --- a/src/meta/processors/customKV/RemoveProcessor.cpp +++ b/src/meta/processors/customKV/RemoveProcessor.cpp @@ -12,7 +12,7 @@ namespace meta { void RemoveProcessor::process(const cpp2::RemoveReq& req) { CHECK_SEGMENT(req.get_segment()); auto key = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_key()); - doRemove(key); + doRemove(key, false); } } // namespace meta diff --git a/src/meta/processors/customKV/RemoveRangeProcessor.cpp b/src/meta/processors/customKV/RemoveRangeProcessor.cpp index 905fcd916e1..46346d54937 100644 --- a/src/meta/processors/customKV/RemoveRangeProcessor.cpp +++ b/src/meta/processors/customKV/RemoveRangeProcessor.cpp @@ -13,7 +13,7 @@ void RemoveRangeProcessor::process(const cpp2::RemoveRangeReq& req) { CHECK_SEGMENT(req.get_segment()); auto start = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_start()); auto end = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_end()); - doRemoveRange(start, end); + doRemoveRange(start, end, false); } } // namespace meta diff --git a/src/meta/processors/partsMan/CreateSpaceProcessor.cpp b/src/meta/processors/partsMan/CreateSpaceProcessor.cpp index a0e872ec078..c04d855f307 100644 --- a/src/meta/processors/partsMan/CreateSpaceProcessor.cpp +++ b/src/meta/processors/partsMan/CreateSpaceProcessor.cpp @@ -5,7 +5,7 @@ */ #include "meta/processors/partsMan/CreateSpaceProcessor.h" -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" DEFINE_int32(default_parts_num, 1024, "The default number of parts when a space is created"); DEFINE_int32(default_replica_factor, 1, "The default replica factor when a space is created"); diff --git a/src/meta/processors/partsMan/ListHostsProcessor.cpp b/src/meta/processors/partsMan/ListHostsProcessor.cpp index 0604ae373f9..9c584e04d52 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.cpp +++ b/src/meta/processors/partsMan/ListHostsProcessor.cpp @@ -5,7 +5,7 @@ */ #include "meta/processors/partsMan/ListHostsProcessor.h" -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include "meta/processors/admin/AdminClient.h" DECLARE_int32(expired_threshold_sec); @@ -49,21 +49,21 @@ StatusOr> ListHostsProcessor::allHostsWithStatus( auto now = time::WallClock::fastNowInMilliSec(); std::vector removeHostsKey; while (iter->valid()) { - cpp2::HostItem item; - nebula::cpp2::HostAddr host; - auto hostAddrPiece = iter->key().subpiece(hostPrefix.size()); - memcpy(&host, hostAddrPiece.data(), hostAddrPiece.size()); - item.set_hostAddr(host); - HostInfo info = HostInfo::decode(iter->val()); - if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) { - if (now - info.lastHBTimeInMilliSec_ < FLAGS_expired_threshold_sec * 1000) { - item.set_status(cpp2::HostStatus::ONLINE); + if (!MetaServiceUtils::isHostKeyWithSpaceId(iter->key())) { + cpp2::HostItem item; + auto host = MetaServiceUtils::parseHostKeyAddr(iter->key()); + item.set_hostAddr(host); + TimeInfo info = TimeInfo::decode(iter->val()); + if (now - info.lastTime_ < FLAGS_removed_threshold_sec * 1000) { + if (now - info.lastTime_ < FLAGS_expired_threshold_sec * 1000) { + item.set_status(cpp2::HostStatus::ONLINE); + } else { + item.set_status(cpp2::HostStatus::OFFLINE); + } + hostItems.emplace_back(item); } else { - item.set_status(cpp2::HostStatus::OFFLINE); + removeHostsKey.emplace_back(iter->key()); } - hostItems.emplace_back(item); - } else { - removeHostsKey.emplace_back(iter->key()); } iter->next(); } @@ -97,9 +97,7 @@ StatusOr> ListHostsProcessor::allHostsWithStatus( return Status::Error("Cant't find any partitions"); } while (iter->valid()) { - auto key = iter->key(); - PartitionID partId; - memcpy(&partId, key.data() + partPrefix.size(), sizeof(PartitionID)); + PartitionID partId = MetaServiceUtils::parsePartKeyPartId(iter->key()); auto partHosts = MetaServiceUtils::parsePartVal(iter->val()); for (auto& host : partHosts) { hostParts[HostAddr(host.ip, host.port)].emplace_back(partId); diff --git a/src/meta/processors/partsMan/ListPartsProcessor.cpp b/src/meta/processors/partsMan/ListPartsProcessor.cpp index 7f3a8bb9d56..3fa8376665f 100644 --- a/src/meta/processors/partsMan/ListPartsProcessor.cpp +++ b/src/meta/processors/partsMan/ListPartsProcessor.cpp @@ -4,7 +4,7 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include "meta/processors/partsMan/ListPartsProcessor.h" #include "meta/processors/admin/AdminClient.h" diff --git a/src/meta/test/ActiveHostsManTest.cpp b/src/meta/test/ActiveManTest.cpp similarity index 53% rename from src/meta/test/ActiveHostsManTest.cpp rename to src/meta/test/ActiveManTest.cpp index db7203d7522..a8eee6f4601 100644 --- a/src/meta/test/ActiveHostsManTest.cpp +++ b/src/meta/test/ActiveManTest.cpp @@ -6,7 +6,7 @@ #include "base/Base.h" #include #include -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include "fs/TempDir.h" #include "meta/test/TestUtils.h" @@ -15,17 +15,41 @@ DECLARE_int32(expired_threshold_sec); namespace nebula { namespace meta { +TEST(LastUpdateTimeManTest, NormalTest) { + fs::TempDir rootPath("/tmp/LastUpdateTimeManTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + + ASSERT_EQ(0, LastUpdateTimeMan::get(kv.get())); + int64_t now = time::WallClock::fastNowInMilliSec(); + + LastUpdateTimeMan::update(kv.get(), TimeInfo(now)); + ASSERT_EQ(now, LastUpdateTimeMan::get(kv.get())); + LastUpdateTimeMan::update(kv.get(), TimeInfo(now + 100)); + ASSERT_EQ(now + 100, LastUpdateTimeMan::get(kv.get())); + + LastUpdateTimeMan::update(kv.get(), TimeInfo(now - 100)); + { + auto key = MetaServiceUtils::lastUpdateTimeKey(); + std::string val; + auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val); + ASSERT_EQ(ret, kvstore::ResultCode::SUCCEEDED); + ASSERT_EQ(now - 100, TimeInfo::decode(val).lastTime_); + } +} + + TEST(ActiveHostsManTest, NormalTest) { fs::TempDir rootPath("/tmp/ActiveHostsManTest.XXXXXX"); FLAGS_expired_threshold_sec = 2; std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + auto now = time::WallClock::fastNowInMilliSec(); - ActiveHostsMan::updateHostInfo(kv.get(), HostAddr(0, 0), HostInfo(now)); - ActiveHostsMan::updateHostInfo(kv.get(), HostAddr(0, 1), HostInfo(now)); - ActiveHostsMan::updateHostInfo(kv.get(), HostAddr(0, 2), HostInfo(now)); + ActiveHostsMan::updateHostInfo(kv.get(), HostAddr(0, 0), TimeInfo(now)); + ActiveHostsMan::updateHostInfo(kv.get(), HostAddr(0, 1), TimeInfo(now)); + ActiveHostsMan::updateHostInfo(kv.get(), HostAddr(0, 2), TimeInfo(now)); ASSERT_EQ(3, ActiveHostsMan::getActiveHosts(kv.get()).size()); - ActiveHostsMan::updateHostInfo(kv.get(), HostAddr(0, 0), HostInfo(now + 2000)); + ActiveHostsMan::updateHostInfo(kv.get(), HostAddr(0, 0), TimeInfo(now + 2000)); ASSERT_EQ(3, ActiveHostsMan::getActiveHosts(kv.get()).size()); { const auto& prefix = MetaServiceUtils::hostPrefix(); @@ -34,13 +58,13 @@ TEST(ActiveHostsManTest, NormalTest) { CHECK_EQ(kvstore::ResultCode::SUCCEEDED, ret); int i = 0; while (iter->valid()) { - auto host = MetaServiceUtils::parseHostKey(iter->key()); - HostInfo info = HostInfo::decode(iter->val()); + auto host = MetaServiceUtils::parseHostKeyAddr(iter->key()); + TimeInfo info = TimeInfo::decode(iter->val()); ASSERT_EQ(HostAddr(0, i), HostAddr(host.ip, host.port)); if (i == 0) { - ASSERT_EQ(HostInfo(now + 2000), info); + ASSERT_EQ(TimeInfo(now + 2000), info); } else { - ASSERT_EQ(HostInfo(now), info); + ASSERT_EQ(TimeInfo(now), info); } iter->next(); i++; diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp index 7b9fa54f984..fc2ecf19c84 100644 --- a/src/meta/test/BalanceIntegrationTest.cpp +++ b/src/meta/test/BalanceIntegrationTest.cpp @@ -15,7 +15,6 @@ #include "storage/test/TestUtils.h" #include "dataman/RowWriter.h" -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); DECLARE_uint32(raft_heartbeat_interval_secs); DECLARE_int32(expired_threshold_sec); @@ -24,7 +23,6 @@ namespace nebula { namespace meta { TEST(BalanceIntegrationTest, BalanceTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; FLAGS_raft_heartbeat_interval_secs = 1; FLAGS_expired_threshold_sec = 3; @@ -49,8 +47,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { LOG(INFO) << "Create meta client..."; uint32_t tempDataPort = network::NetworkUtils::getAvailablePort(); HostAddr tempDataAddr(localIp, tempDataPort); - auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, - kClusterId, false); + auto mClient = std::make_unique(threadPool, + metaAddr, + tempDataAddr, + kClusterId, + false); mClient->waitForMetadReady(); @@ -69,8 +70,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { VLOG(1) << "The storage server has been added to the meta service"; - auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto metaClient = std::make_shared(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); metaClient->waitForMetadReady(); metaClients.emplace_back(metaClient); } @@ -101,7 +105,7 @@ TEST(BalanceIntegrationTest, BalanceTest) { auto tagRet = mClient->createTagSchema(spaceId, "tag", std::move(schema)).get(); ASSERT_TRUE(tagRet.ok()); auto tagId = tagRet.value(); - sleep(FLAGS_load_data_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); LOG(INFO) << "Let's write some data"; auto sClient = std::make_unique(threadPool, mClient.get()); @@ -172,8 +176,11 @@ TEST(BalanceIntegrationTest, BalanceTest) { uint32_t storagePort = network::NetworkUtils::getAvailablePort(); HostAddr storageAddr(localIp, storagePort); { - newMetaClient = std::make_unique(threadPool, metaAddr, storageAddr, - kClusterId, true); + newMetaClient = std::make_unique(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); newMetaClient->waitForMetadReady(); std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), replica + 1); newServer = storage::TestUtils::mockStorageServer(newMetaClient.get(), @@ -245,7 +252,6 @@ TEST(BalanceIntegrationTest, BalanceTest) { } TEST(BalanceIntegrationTest, LeaderBalanceTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; FLAGS_raft_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/balance_integration_test.XXXXXX"); @@ -269,8 +275,11 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { LOG(INFO) << "Create meta client..."; uint32_t tempDataPort = network::NetworkUtils::getAvailablePort(); HostAddr tempDataAddr(localIp, tempDataPort); - auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, - kClusterId, false); + auto mClient = std::make_unique(threadPool, + metaAddr, + tempDataAddr, + kClusterId, + false); mClient->waitForMetadReady(); @@ -289,8 +298,11 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { VLOG(1) << "The storage server has been added to the meta service"; - auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto metaClient = std::make_shared(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); metaClient->waitForMetadReady(); metaClients.emplace_back(metaClient); } @@ -307,7 +319,7 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { auto ret = mClient->createSpace("storage", partition, replica).get(); ASSERT_TRUE(ret.ok()); - sleep(FLAGS_load_data_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); + sleep(FLAGS_heartbeat_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); auto code = balancer.leaderBalance(); ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); @@ -335,4 +347,3 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } - diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 6ad21b559c9..4ae6f597bdb 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -154,9 +154,9 @@ nebula_add_test( nebula_add_test( NAME - active_hosts_man_test + active_man_test SOURCES - ActiveHostsManTest.cpp + ActiveManTest.cpp OBJECTS $ $ diff --git a/src/meta/test/ConfigManTest.cpp b/src/meta/test/ConfigManTest.cpp index f286a7adeee..fedb488d253 100644 --- a/src/meta/test/ConfigManTest.cpp +++ b/src/meta/test/ConfigManTest.cpp @@ -21,7 +21,7 @@ #include "storage/test/TestUtils.h" #include "rocksdb/utilities/options_util.h" -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); DECLARE_string(rocksdb_db_options); DECLARE_string(rocksdb_column_family_options); @@ -265,7 +265,7 @@ ConfigItem toConfigItem(const cpp2::ConfigItem& item) { } TEST(ConfigManTest, MetaConfigManTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaConfigManTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -313,7 +313,7 @@ TEST(ConfigManTest, MetaConfigManTest) { std::string name = "not_existed"; auto type = cpp2::ConfigType::INT64; - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); // get/set without register auto setRet = cfgMan.setConfig(module, name, type, 101l).get(); ASSERT_FALSE(setRet.ok()); @@ -336,7 +336,7 @@ TEST(ConfigManTest, MetaConfigManTest) { auto value = boost::get(item.value_); ASSERT_EQ(value, 100); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_int64_key_immutable, 100); } // mutable config @@ -357,7 +357,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, 102); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_int64_key, 102); } { @@ -377,7 +377,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, true); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_bool_key, true); } { @@ -397,7 +397,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, 3.14); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_double_key, 3.14); } { @@ -418,7 +418,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(value, "abc"); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_string_key, "abc"); } { @@ -446,7 +446,7 @@ TEST(ConfigManTest, MetaConfigManTest) { ASSERT_EQ(val, "8"); // get from cache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); confRet = conf.parseFromString(FLAGS_nested_key); ASSERT_TRUE(confRet.ok()); status = conf.fetchAsString("max_background_compactions", val); @@ -461,7 +461,7 @@ TEST(ConfigManTest, MetaConfigManTest) { } TEST(ConfigManTest, MockConfigTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MockConfigTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -511,7 +511,7 @@ TEST(ConfigManTest, MockConfigTest) { } // check values in ClientBaseGflagsManager - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_test0, "updated0"); ASSERT_EQ(FLAGS_test1, "updated1"); ASSERT_EQ(FLAGS_test2, "updated2"); @@ -520,7 +520,7 @@ TEST(ConfigManTest, MockConfigTest) { } TEST(ConfigManTest, RocksdbOptionsTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/RocksdbOptionsTest.XXXXXX"); IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -542,8 +542,11 @@ TEST(ConfigManTest, RocksdbOptionsTest) { LOG(INFO) << "Create meta client..."; uint32_t storagePort = network::NetworkUtils::getAvailablePort(); HostAddr storageAddr(localIp, storagePort); - auto mClient = std::make_unique(threadPool, metaAddr, storageAddr, - kClusterId, true); + auto mClient = std::make_unique(threadPool, + metaAddr, + storageAddr, + kClusterId, + true); mClient->waitForMetadReady(); mClient->gflagsModule_ = module; @@ -576,7 +579,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto ret = mClient->createSpace("storage", 9, 1).get(); ASSERT_TRUE(ret.ok()); auto spaceId = ret.value(); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); storage::TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 9); { std::string name = "rocksdb_db_options"; @@ -591,7 +594,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_rocksdb_db_options, value); } { @@ -608,12 +611,12 @@ TEST(ConfigManTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_rocksdb_column_family_options, value); } { // need to sleep a bit to take effect on rocksdb - sleep(3); + sleep(FLAGS_heartbeat_interval_secs + 2); rocksdb::DBOptions loadedDbOpt; std::vector loadedCfDescs; std::string rocksPath = folly::stringPrintf("%s/disk1/nebula/%d/data", diff --git a/src/meta/test/HBProcessorTest.cpp b/src/meta/test/HBProcessorTest.cpp index 6eda99c2e0b..6a6bfd3c6ab 100644 --- a/src/meta/test/HBProcessorTest.cpp +++ b/src/meta/test/HBProcessorTest.cpp @@ -25,18 +25,47 @@ TEST(HBProcessorTest, HBTest) { std::unique_ptr kv(TestUtils::initKV(rootPath.path())); const ClusterID kClusterId = 10; { + int32_t count = 1; + int32_t verifyCount = 1; for (auto i = 0; i < 5; i++) { cpp2::HBReq req; + req.set_in_storaged(true); nebula::cpp2::HostAddr thriftHost; thriftHost.set_ip(i); thriftHost.set_port(i); req.set_host(std::move(thriftHost)); req.set_cluster_id(kClusterId); + LeaderIds thriftLeaderIds; + for (GraphSpaceID spaceId = 1; spaceId < 3; spaceId++) { + for (int32_t j = 0; j < 5; j++) { + thriftLeaderIds[spaceId].emplace_back(count + j); + } + count += 5; + } + req.set_leader_parts(std::move(thriftLeaderIds)); auto* processor = HBProcessor::instance(kv.get(), kClusterId); auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + + auto host = req.get_host(); + std::string prefix = MetaServiceUtils::hostKey(host.get_ip(), host.get_port()); + std::unique_ptr iter; + auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + CHECK_EQ(kvstore::ResultCode::SUCCEEDED, ret); + while (iter->valid()) { + if (MetaServiceUtils::isHostKeyWithSpaceId(iter->key())) { + auto spaceId = MetaServiceUtils::parseHostKeySpaceId(iter->key()); + EXPECT_EQ(spaceId, (verifyCount % 10) / 5 + 1); + auto ids = MetaServiceUtils::parseHostVal(iter->val()); + ASSERT_EQ(5, ids.size()); + for (auto& id : ids) { + EXPECT_EQ(id, verifyCount++); + } + } + iter->next(); + } } auto hosts = ActiveHostsMan::getActiveHosts(kv.get(), 1); ASSERT_EQ(5, hosts.size()); diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 9a678d260dc..fd6675272c9 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -17,7 +17,6 @@ #include "meta/test/TestUtils.h" #include "meta/ClientBasedGflagsManager.h" -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); DECLARE_string(rocksdb_db_options); @@ -30,7 +29,7 @@ using nebula::cpp2::ValueType; using apache::thrift::FragileConstructor::FRAGILE; TEST(MetaClientTest, InterfacesTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); // Let the system choose an available port for us @@ -45,7 +44,9 @@ TEST(MetaClientTest, InterfacesTest) { HostAddr localHost{localIp, clientPort}; auto client = std::make_shared(threadPool, std::vector{HostAddr(localIp, sc->port_)}, - localHost); + localHost, + 0, + false); client->waitForMetadReady(); { // Add hosts automatically, then testing listHosts interface. @@ -117,7 +118,7 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_EQ(ret1.value().begin()->schema.columns.size(), 5); // getTagSchemaFromCache - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); auto ret = client->getNewestTagVerFromCache(spaceId, ret1.value().begin()->tag_id); CHECK(ret.ok()); @@ -190,7 +191,7 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_STREQ("edgeItem0", outSchema1->getFieldName(0)); } } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); { // Test cache interfaces auto partsMap = client->getPartsMapFromCache(HostAddr(0, 0)); @@ -287,7 +288,7 @@ TEST(MetaClientTest, InterfacesTest) { } TEST(MetaClientTest, TagTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTagTest.XXXXXX"); // Let the system choose an available port for us @@ -299,7 +300,7 @@ TEST(MetaClientTest, TagTest) { IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto localhosts = std::vector{HostAddr(localIp, sc->port_)}; - auto client = std::make_shared(threadPool, localhosts); + auto client = std::make_shared(threadPool, localhosts, HostAddr(0, 0), 0, false); std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}}; client->waitForMetadReady(); TestUtils::registerHB(sc->kvStore_.get(), hosts); @@ -405,6 +406,12 @@ class TestListener : public MetaChangedListener { partChanged++; } + void fetchLeaderInfo(std::unordered_map>& leaderIds) override { + LOG(INFO) << "Get leader distribution!"; + UNUSED(leaderIds); + } + HostAddr getLocalHost() { return HostAddr(0, 0); } @@ -416,7 +423,7 @@ class TestListener : public MetaChangedListener { }; TEST(MetaClientTest, DiffTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); // Let the system choose an available port for us @@ -428,7 +435,10 @@ TEST(MetaClientTest, DiffTest) { network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); auto listener = std::make_unique(); auto client = std::make_shared(threadPool, - std::vector{HostAddr(localIp, sc->port_)}); + std::vector{HostAddr(localIp, sc->port_)}, + HostAddr(0, 0), + 0, + false); client->waitForMetadReady(); client->registerListener(listener.get()); { @@ -448,14 +458,14 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->createSpace("default_space", 9, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(1, listener->spaceNum); ASSERT_EQ(9, listener->partNum); { auto ret = client->createSpace("default_space_1", 5, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(2, listener->spaceNum); ASSERT_EQ(14, listener->partNum); { @@ -463,13 +473,12 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->dropSpace("default_space_1").get(); ASSERT_TRUE(ret.ok()) << ret.status(); } - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(1, listener->spaceNum); ASSERT_EQ(9, listener->partNum); } TEST(MetaClientTest, HeartbeatTest) { - FLAGS_load_data_interval_secs = 5; FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; fs::TempDir rootPath("/tmp/MetaClientTest.XXXXXX"); @@ -486,7 +495,7 @@ TEST(MetaClientTest, HeartbeatTest) { std::vector{HostAddr(localIp, 10001)}, localHost, kClusterId, - true); // send heartbeat + true); client->waitForMetadReady(); client->registerListener(listener.get()); { @@ -553,6 +562,7 @@ class TestMetaServiceRetry : public cpp2::MetaServiceSvIf { }; TEST(MetaClientTest, SimpleTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -578,6 +588,7 @@ TEST(MetaClientTest, SimpleTest) { } TEST(MetaClientTest, RetryWithExceptionTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -600,6 +611,7 @@ TEST(MetaClientTest, RetryWithExceptionTest) { } TEST(MetaClientTest, RetryOnceTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -639,6 +651,7 @@ TEST(MetaClientTest, RetryOnceTest) { } TEST(MetaClientTest, RetryUntilLimitTest) { + FLAGS_heartbeat_interval_secs = 3600; IPv4 localIp; network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); @@ -677,7 +690,7 @@ TEST(MetaClientTest, RetryUntilLimitTest) { } TEST(MetaClientTest, RocksdbOptionsTest) { - FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; fs::TempDir rootPath("/tmp/RocksdbOptionsTest.XXXXXX"); uint32_t localMetaPort = 0; auto sc = TestUtils::mockMetaServer(localMetaPort, rootPath.path()); @@ -713,7 +726,7 @@ TEST(MetaClientTest, RocksdbOptionsTest) { std::vector hosts = {{0, 0}}; TestUtils::registerHB(sc->kvStore_.get(), hosts); client->createSpace("default_space", 9, 1).get(); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); } { std::string name = "rocksdb_db_options"; @@ -730,7 +743,7 @@ TEST(MetaClientTest, RocksdbOptionsTest) { auto item = getRet.value().front(); auto value = boost::get(item.get_value()); - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); ASSERT_EQ(FLAGS_rocksdb_db_options, value); ASSERT_EQ(listener->options["write_buffer_size"], "2097152"); ASSERT_EQ(listener->options["disable_auto_compactions"], "true"); diff --git a/src/meta/test/MetaHttpDownloadHandlerTest.cpp b/src/meta/test/MetaHttpDownloadHandlerTest.cpp index c6cfa5f0a9b..2054525cd7e 100644 --- a/src/meta/test/MetaHttpDownloadHandlerTest.cpp +++ b/src/meta/test/MetaHttpDownloadHandlerTest.cpp @@ -14,7 +14,6 @@ #include "storage/StorageHttpDownloadHandler.h" #include "fs/TempDir.h" -DECLARE_int32(load_data_interval_secs); DECLARE_string(pid_file); DECLARE_int32(ws_storage_http_port); diff --git a/src/meta/test/MetaHttpStatusHandlerTest.cpp b/src/meta/test/MetaHttpStatusHandlerTest.cpp index 830ed25c3d3..6e3257187d5 100644 --- a/src/meta/test/MetaHttpStatusHandlerTest.cpp +++ b/src/meta/test/MetaHttpStatusHandlerTest.cpp @@ -13,7 +13,6 @@ #include "meta/test/TestUtils.h" #include "fs/TempDir.h" -DECLARE_int32(load_data_interval_secs); DECLARE_string(pid_file); namespace nebula { diff --git a/src/meta/test/MetaServiceUtilsTest.cpp b/src/meta/test/MetaServiceUtilsTest.cpp index 4ddb02a3d9c..fbb1cafcc89 100644 --- a/src/meta/test/MetaServiceUtilsTest.cpp +++ b/src/meta/test/MetaServiceUtilsTest.cpp @@ -31,6 +31,8 @@ TEST(MetaServiceUtilsTest, SpaceKeyTest) { TEST(MetaServiceUtilsTest, PartKeyTest) { auto partKey = MetaServiceUtils::partKey(0, 1); + ASSERT_EQ(0, MetaServiceUtils::parsePartKeySpaceId(partKey)); + ASSERT_EQ(1, MetaServiceUtils::parsePartKeyPartId(partKey)); auto prefix = MetaServiceUtils::partPrefix(0); ASSERT_EQ("__parts__", prefix.substr(0, prefix.size() - sizeof(GraphSpaceID))); ASSERT_EQ(0, *reinterpret_cast( @@ -57,15 +59,44 @@ TEST(MetaServiceUtilsTest, PartKeyTest) { TEST(MetaServiceUtilsTest, HostKeyTest) { auto hostKey = MetaServiceUtils::hostKey(10, 11); + ASSERT_FALSE(MetaServiceUtils::isHostKeyWithSpaceId(hostKey)); const auto& prefix = MetaServiceUtils::hostPrefix(); ASSERT_EQ("__hosts__", prefix); ASSERT_EQ(prefix, hostKey.substr(0, hostKey.size() - 2 * sizeof(int32_t))); ASSERT_EQ(10, *reinterpret_cast(hostKey.c_str() + prefix.size())); ASSERT_EQ(11, *reinterpret_cast(hostKey.c_str() + prefix.size() + sizeof(IPv4))); - auto addr = MetaServiceUtils::parseHostKey(hostKey); + auto addr = MetaServiceUtils::parseHostKeyAddr(hostKey); ASSERT_EQ(10, addr.get_ip()); ASSERT_EQ(11, addr.get_port()); + + // test leader partition information + GraphSpaceID spaceId = 12; + auto newHostKey = MetaServiceUtils::hostKey(10, 11, spaceId); + ASSERT_TRUE(MetaServiceUtils::isHostKeyWithSpaceId(newHostKey)); + auto offset = sizeof(IPv4) + sizeof(Port) + sizeof(GraphSpaceID); + ASSERT_EQ(prefix, newHostKey.substr(0, newHostKey.size() - offset)); + addr = MetaServiceUtils::parseHostKeyAddr(newHostKey); + ASSERT_EQ(10, addr.get_ip()); + ASSERT_EQ(11, addr.get_port()); + ASSERT_EQ(12, *reinterpret_cast(newHostKey.c_str() + + prefix.size() + + sizeof(IPv4) + + sizeof(Port))); + ASSERT_EQ(spaceId, MetaServiceUtils::parseHostKeySpaceId(newHostKey)); + + std::vector partIds; + for (PartitionID i = 101; i < 1024; i++) { + partIds.emplace_back(i); + } + auto val = MetaServiceUtils::hostVal(partIds); + ASSERT_EQ((1024 - 101) * sizeof(PartitionID), val.size()); + + auto ids = MetaServiceUtils::parseHostVal(val); + ASSERT_EQ(partIds.size(), ids.size()); + for (size_t i = 0; i < ids.size(); i++) { + ASSERT_EQ(partIds[i], ids[i]); + } } TEST(MetaServiceUtilsTest, TagTest) { diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 819ad9fc181..6f4ead31437 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -20,7 +20,7 @@ #include "meta/processors/usersMan/AuthenticationProcessor.h" #include "interface/gen-cpp2/common_types.h" #include "time/WallClock.h" -#include "meta/ActiveHostsMan.h" +#include "meta/ActiveMan.h" #include DECLARE_string(part_man_type); @@ -156,7 +156,7 @@ class TestUtils { static void registerHB(kvstore::KVStore* kv, const std::vector& hosts) { auto now = time::WallClock::fastNowInMilliSec(); for (auto& h : hosts) { - auto ret = ActiveHostsMan::updateHostInfo(kv, h, HostInfo(now)); + auto ret = ActiveHostsMan::updateHostInfo(kv, h, TimeInfo(now)); CHECK_EQ(ret, kvstore::ResultCode::SUCCEEDED); } } diff --git a/src/storage/UpdateEdgeProcessor.cpp b/src/storage/UpdateEdgeProcessor.cpp index 76f1da0e9f4..64fb9e3b32d 100644 --- a/src/storage/UpdateEdgeProcessor.cpp +++ b/src/storage/UpdateEdgeProcessor.cpp @@ -339,7 +339,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { this->spaceId_ = req.get_space_id(); insertable_ = req.get_insertable(); auto partId = req.get_part_id(); - auto edgeKey = std::move(req).get_edge_key(); + auto edgeKey = req.get_edge_key(); std::vector eTypes; eTypes.emplace_back(edgeKey.get_edge_type()); this->initEdgeContext(eTypes); @@ -350,7 +350,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { this->onFinished(); return; } - updateItems_ = std::move(req).get_update_items(); + updateItems_ = req.get_update_items(); VLOG(3) << "Update edge, spaceId: " << this->spaceId_ << ", partId: " << partId << ", src: " << edgeKey.get_src() << ", edge_type: " << edgeKey.get_edge_type() diff --git a/src/storage/UpdateVertexProcessor.cpp b/src/storage/UpdateVertexProcessor.cpp index 7d268b2e4b4..12e9d49800e 100644 --- a/src/storage/UpdateVertexProcessor.cpp +++ b/src/storage/UpdateVertexProcessor.cpp @@ -323,7 +323,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { return; } auto vId = req.get_vertex_id(); - updateItems_ = std::move(req).get_update_items(); + updateItems_ = req.get_update_items(); VLOG(3) << "Update vertex, spaceId: " << this->spaceId_ << ", partId: " << partId << ", vId: " << vId; diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 7f47b29ce22..855648a02b1 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -16,14 +16,12 @@ #include "network/NetworkUtils.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace storage { TEST(StorageClientTest, VerticesInterfacesTest) { - FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; const nebula::ClusterID kClusterId = 10; fs::TempDir rootPath("/tmp/StorageClientTest.XXXXXX"); @@ -75,7 +73,7 @@ TEST(StorageClientTest, VerticesInterfacesTest) { ASSERT_TRUE(ret.ok()) << ret.status(); spaceId = ret.value(); LOG(INFO) << "Created space \"default\", its id is " << spaceId; - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 10); auto client = std::make_unique(threadPool, mClient.get()); diff --git a/src/storage/test/StorageHttpDownloadHandlerTest.cpp b/src/storage/test/StorageHttpDownloadHandlerTest.cpp index 7cd254321e5..873ca796c5d 100644 --- a/src/storage/test/StorageHttpDownloadHandlerTest.cpp +++ b/src/storage/test/StorageHttpDownloadHandlerTest.cpp @@ -14,7 +14,6 @@ #include "fs/TempDir.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); namespace nebula { namespace storage { diff --git a/src/storage/test/StorageHttpStatusHandlerTest.cpp b/src/storage/test/StorageHttpStatusHandlerTest.cpp index dfabf6061db..abaa3a04101 100644 --- a/src/storage/test/StorageHttpStatusHandlerTest.cpp +++ b/src/storage/test/StorageHttpStatusHandlerTest.cpp @@ -15,7 +15,6 @@ #include "http/HttpClient.h" DECLARE_string(meta_server_addrs); -DECLARE_int32(load_data_interval_secs); namespace nebula { namespace storage { diff --git a/src/tools/storage-perf/StorageIntegrityTool.cpp b/src/tools/storage-perf/StorageIntegrityTool.cpp index 7608d5b5614..6a3e658fafe 100644 --- a/src/tools/storage-perf/StorageIntegrityTool.cpp +++ b/src/tools/storage-perf/StorageIntegrityTool.cpp @@ -22,7 +22,7 @@ DEFINE_int64(first_vertex_id, 1, "The smallest vertex id"); DEFINE_uint64(width, 100, "width of matrix"); DEFINE_uint64(height, 1000, "height of matrix"); -DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); namespace nebula { namespace storage { @@ -38,7 +38,7 @@ namespace storage { * update it after we suppport preheat. The tag must have only one int property, * which is prop_name. * 2. If the space and tag doesn't exists, it will try to create one, maybe you need to set - * load_data_interval_secs to make sure the storage service has load meta. + * heartbeat_interval_secs to make sure the storage service has load meta. * 3. The width and height is the size of the big linked list, you can refer to the graph below. * As expected, we can traverse the big linked list after width * height steps starting * from any node in the list. @@ -66,7 +66,7 @@ class IntegrityTest { private: bool init() { - FLAGS_load_data_interval_secs = 10; + FLAGS_heartbeat_interval_secs = 10; auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) { LOG(ERROR) << "Can't get metaServer address, status: " << metaAddrsRet.status() @@ -95,7 +95,7 @@ class IntegrityTest { auto tagResult = mClient_->getTagIDByNameFromCache(spaceId_, FLAGS_tag_name); if (!tagResult.ok()) { - sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_heartbeat_interval_secs + 1); LOG(ERROR) << "Get tagId failed, try to create one: " << tagResult.status(); nebula::cpp2::Schema schema; nebula::cpp2::ColumnDef column;