Skip to content

Commit

Permalink
Improve heartbeat between metad and storaged by MetaClient
Browse files Browse the repository at this point in the history
1. Add the LastUpdateTimeMan to record the latest metadata change(insert, update, delete)
   time on the MetaServer side.
2. Enrich the information(lastUpdateTime, partitions leaderDist) of `heartbeat` exchange
   between MetaServer and MetaClient at storage node.
3. It could optimize the load data logic inside meta client by lastUpdateTime, which makes
   metadata synchronization smarter and more timely.
4. Remove sendHeartBeat_ and load_data_interval_secs from MetaClient class, and add the
   bool inStoraged_ to indicate whether a metaclient is in storaged.

close vesoft-inc#1173
close vesoft-inc#1060
  • Loading branch information
zhangguoqing committed Nov 13, 2019
1 parent d3a1fd0 commit 4745138
Show file tree
Hide file tree
Showing 55 changed files with 553 additions and 272 deletions.
2 changes: 1 addition & 1 deletion src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "kvstore/PartManager.h"
#include "meta/ClusterIdMan.h"
#include "kvstore/NebulaStore.h"
#include "meta/ActiveHostsMan.h"
#include "meta/ActiveMan.h"
#include "meta/KVBasedGflagsManager.h"

using nebula::operator<<;
Expand Down
6 changes: 5 additions & 1 deletion src/graph/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ Status ExecutionEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExec
if (!addrs.ok()) {
return addrs.status();
}
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()));
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor,
std::move(addrs.value()),
HostAddr(0, 0),
0,
false);
// load data try 3 time
bool loadDataOk = metaClient_->waitForMetadReady(3);
if (!loadDataOk) {
Expand Down
2 changes: 0 additions & 2 deletions src/graph/test/ConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include "meta/test/TestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {

Expand Down
4 changes: 2 additions & 2 deletions src/graph/test/DataTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "graph/test/TestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -134,7 +134,7 @@ AssertionResult DataTest::prepareSchema() {
<< " failed, error code "<< static_cast<int32_t>(code);
}
}
sleep(FLAGS_load_data_interval_secs + 3);
sleep(FLAGS_heartbeat_interval_secs + 3);
return TestOK();
}

Expand Down
2 changes: 0 additions & 2 deletions src/graph/test/GroupByLimitTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "graph/test/TraverseTestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {

Expand Down
3 changes: 0 additions & 3 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "meta/test/TestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {

Expand Down Expand Up @@ -686,7 +684,6 @@ TEST_F(SchemaTest, metaCommunication) {
ASSERT_EQ(1, (*(resp.get_rows())).size());
}

sleep(FLAGS_load_data_interval_secs + 1);
int retry = 60;
while (retry-- > 0) {
auto spaceResult = gEnv->metaClient()->getSpaceIdByNameFromCache("default_space");
Expand Down
6 changes: 3 additions & 3 deletions src/graph/test/TestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "meta/test/TestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);
DECLARE_string(meta_server_addrs);

namespace nebula {
Expand All @@ -27,7 +27,7 @@ TestEnv::~TestEnv() {


void TestEnv::SetUp() {
FLAGS_load_data_interval_secs = 1;
FLAGS_heartbeat_interval_secs = 1;
const nebula::ClusterID kClusterId = 10;
// Create metaServer
metaServer_ = nebula::meta::TestUtils::mockMetaServer(
Expand Down Expand Up @@ -72,7 +72,7 @@ void TestEnv::SetUp() {

void TestEnv::TearDown() {
// TO make sure the drop space be invoked on storage server
sleep(FLAGS_load_data_interval_secs + 1);
sleep(FLAGS_heartbeat_interval_secs + 1);
graphServer_.reset();
storageServer_.reset();
mClient_.reset();
Expand Down
4 changes: 2 additions & 2 deletions src/graph/test/TraverseTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "graph/test/TestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -403,7 +403,7 @@ AssertionResult TraverseTestBase::prepareSchema() {
return TestError() << "Do cmd:" << cmd << " failed";
}
}
sleep(FLAGS_load_data_interval_secs + 3);
sleep(FLAGS_heartbeat_interval_secs + 3);
return TestOK();
}

Expand Down
2 changes: 0 additions & 2 deletions src/graph/test/UpdateTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "graph/test/UpdateTestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {

Expand Down
4 changes: 2 additions & 2 deletions src/graph/test/UpdateTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "graph/test/TestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -118,7 +118,7 @@ AssertionResult UpdateTestBase::prepareSchema() {
return TestError() << "Do cmd:" << cmd << " failed";
}
}
sleep(FLAGS_load_data_interval_secs + 3);
sleep(FLAGS_heartbeat_interval_secs + 3);
return TestOK();
}

Expand Down
7 changes: 5 additions & 2 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,14 @@ struct HBResp {
1: ErrorCode code,
2: common.HostAddr leader,
3: common.ClusterID cluster_id,
4: i64 last_update_time_in_ms,
}

struct HBReq {
1: common.HostAddr host,
2: common.ClusterID cluster_id,
1: bool in_storaged,
2: common.HostAddr host,
3: common.ClusterID cluster_id,
4: optional map<common.GraphSpaceID, list<common.PartitionID>> (cpp.template = "std::unordered_map") leader_parts;
}

struct CreateUserReq {
Expand Down
8 changes: 4 additions & 4 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,10 @@ class NebulaStore : public KVStore, public Handler {

ResultCode flush(GraphSpaceID spaceId) override;

int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) override;

bool isLeader(GraphSpaceID spaceId, PartitionID partId);

ErrorOr<ResultCode, std::shared_ptr<SpacePartInfo>> space(GraphSpaceID spaceId);

/**
* Implement four interfaces in Handler.
* */
Expand All @@ -177,7 +176,8 @@ class NebulaStore : public KVStore, public Handler {

void removePart(GraphSpaceID spaceId, PartitionID partId) override;

ErrorOr<ResultCode, std::shared_ptr<SpacePartInfo>> space(GraphSpaceID spaceId);
int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) override;

private:
void updateSpaceOption(GraphSpaceID spaceId,
Expand Down
9 changes: 9 additions & 0 deletions src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,14 @@ void MetaServerBasedPartManager::onPartUpdated(const PartMeta& partMeta) {
UNUSED(partMeta);
}

void MetaServerBasedPartManager::fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<PartitionID>>& leaderIds) {
if (handler_ != nullptr) {
handler_->allLeader(leaderIds);
} else {
VLOG(1) << "handler_ is nullptr!";
}
}

} // namespace kvstore
} // namespace nebula
5 changes: 5 additions & 0 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Handler {
bool isDbOption) = 0;
virtual void removeSpace(GraphSpaceID spaceId) = 0;
virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0;
virtual int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) = 0;
};


Expand Down Expand Up @@ -164,6 +166,9 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL

void onPartUpdated(const PartMeta& partMeta) override;

void fetchLeaderInfo(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) override;

HostAddr getLocalHost() {
return localHost_;
}
Expand Down
65 changes: 0 additions & 65 deletions src/meta/ActiveHostsMan.h

This file was deleted.

39 changes: 34 additions & 5 deletions src/meta/ActiveHostsMan.cpp → src/meta/ActiveMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "meta/ActiveHostsMan.h"
#include "meta/ActiveMan.h"
#include "meta/MetaServiceUtils.h"
#include "meta/processors/Common.h"

Expand All @@ -14,13 +14,42 @@ DEFINE_int32(expired_threshold_sec, 10 * 60,
namespace nebula {
namespace meta {

kvstore::ResultCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const TimeInfo& info) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(), TimeInfo::encode(info));
folly::SharedMutex::WriteHolder wHolder(LockUtils::lastUpdateTimeLock());
folly::Baton<true, std::atomic> baton;
kvstore::ResultCode ret;
kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data),
[&] (kvstore::ResultCode code) {
ret = code;
baton.post();
});
baton.wait();
return ret;
}

int64_t LastUpdateTimeMan::get(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
auto key = MetaServiceUtils::lastUpdateTimeKey();
std::string val;
auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val);
if (ret == kvstore::ResultCode::SUCCEEDED) {
TimeInfo info = TimeInfo::decode(val);
return info.lastTime_;
}
return 0;
}


kvstore::ResultCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
const HostAddr& hostAddr,
const HostInfo& info) {
const TimeInfo& info) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
data.emplace_back(MetaServiceUtils::hostKey(hostAddr.first, hostAddr.second),
HostInfo::encode(info));
TimeInfo::encode(info));
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
folly::Baton<true, std::atomic> baton;
kvstore::ResultCode ret;
Expand All @@ -45,8 +74,8 @@ std::vector<HostAddr> ActiveHostsMan::getActiveHosts(kvstore::KVStore* kv, int32
auto now = time::WallClock::fastNowInMilliSec();
while (iter->valid()) {
auto host = MetaServiceUtils::parseHostKey(iter->key());
HostInfo info = HostInfo::decode(iter->val());
if (now - info.lastHBTimeInMilliSec_ < threshold) {
TimeInfo info = TimeInfo::decode(iter->val());
if (now - info.lastTime_ < threshold) {
hosts.emplace_back(host.ip, host.port);
}
iter->next();
Expand Down
Loading

0 comments on commit 4745138

Please sign in to comment.