Skip to content

Commit

Permalink
Improve heartbeat between metad and storaged by MetaClient
Browse files Browse the repository at this point in the history
1. Add the LastUpdateTimeMan to record the latest metadata change(insert, update, delete)
   time on the MetaServer side.
2. Enrich the information(lastUpdateTime, partitions leaderDist) of `heartbeat` exchange
   between MetaServer and MetaClient at storage node.
3. It could optimize the load data logic inside meta client by lastUpdateTime, which makes
   metadata synchronization smarter and more timely.
4. Remove sendHeartBeat_ and load_data_interval_secs from MetaClient class, and add the
   bool inStoraged_ to indicate whether a metaclient is in storaged.

close vesoft-inc#1173
close vesoft-inc#1060
close vesoft-inc#284
  • Loading branch information
zhangguoqing committed Nov 22, 2019
1 parent a3ffc7d commit a93774d
Show file tree
Hide file tree
Showing 42 changed files with 335 additions and 180 deletions.
6 changes: 5 additions & 1 deletion src/graph/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ Status ExecutionEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExec
if (!addrs.ok()) {
return addrs.status();
}
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()));
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor,
std::move(addrs.value()),
HostAddr(0, 0),
0,
false);
// load data try 3 time
bool loadDataOk = metaClient_->waitForMetadReady(3);
if (!loadDataOk) {
Expand Down
2 changes: 0 additions & 2 deletions src/graph/test/ConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include "meta/test/TestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {

Expand Down
4 changes: 2 additions & 2 deletions src/graph/test/DataTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "graph/test/TestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -134,7 +134,7 @@ AssertionResult DataTest::prepareSchema() {
<< " failed, error code "<< static_cast<int32_t>(code);
}
}
sleep(FLAGS_load_data_interval_secs + 3);
sleep(FLAGS_heartbeat_interval_secs + 3);
return TestOK();
}

Expand Down
2 changes: 0 additions & 2 deletions src/graph/test/GroupByLimitTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "graph/test/TraverseTestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {

Expand Down
3 changes: 0 additions & 3 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "meta/test/TestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {

Expand Down Expand Up @@ -686,7 +684,6 @@ TEST_F(SchemaTest, metaCommunication) {
ASSERT_EQ(1, (*(resp.get_rows())).size());
}

sleep(FLAGS_load_data_interval_secs + 1);
int retry = 60;
while (retry-- > 0) {
auto spaceResult = gEnv->metaClient()->getSpaceIdByNameFromCache("default_space");
Expand Down
6 changes: 3 additions & 3 deletions src/graph/test/TestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "meta/test/TestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);
DECLARE_string(meta_server_addrs);

namespace nebula {
Expand All @@ -27,7 +27,7 @@ TestEnv::~TestEnv() {


void TestEnv::SetUp() {
FLAGS_load_data_interval_secs = 1;
FLAGS_heartbeat_interval_secs = 1;
const nebula::ClusterID kClusterId = 10;
// Create metaServer
metaServer_ = nebula::meta::TestUtils::mockMetaServer(
Expand Down Expand Up @@ -72,7 +72,7 @@ void TestEnv::SetUp() {

void TestEnv::TearDown() {
// TO make sure the drop space be invoked on storage server
sleep(FLAGS_load_data_interval_secs + 1);
sleep(FLAGS_heartbeat_interval_secs + 1);
graphServer_.reset();
storageServer_.reset();
mClient_.reset();
Expand Down
4 changes: 2 additions & 2 deletions src/graph/test/TraverseTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "graph/test/TestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -403,7 +403,7 @@ AssertionResult TraverseTestBase::prepareSchema() {
return TestError() << "Do cmd:" << cmd << " failed";
}
}
sleep(FLAGS_load_data_interval_secs + 3);
sleep(FLAGS_heartbeat_interval_secs + 3);
return TestOK();
}

Expand Down
2 changes: 0 additions & 2 deletions src/graph/test/UpdateTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "graph/test/UpdateTestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {

Expand Down
4 changes: 2 additions & 2 deletions src/graph/test/UpdateTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "graph/test/TestBase.h"
#include "meta/test/TestUtils.h"

DECLARE_int32(load_data_interval_secs);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -118,7 +118,7 @@ AssertionResult UpdateTestBase::prepareSchema() {
return TestError() << "Do cmd:" << cmd << " failed";
}
}
sleep(FLAGS_load_data_interval_secs + 3);
sleep(FLAGS_heartbeat_interval_secs + 3);
return TestOK();
}

Expand Down
7 changes: 5 additions & 2 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,14 @@ struct HBResp {
1: ErrorCode code,
2: common.HostAddr leader,
3: common.ClusterID cluster_id,
4: i64 last_update_time_in_ms,
}

struct HBReq {
1: common.HostAddr host,
2: common.ClusterID cluster_id,
1: bool in_storaged,
2: common.HostAddr host,
3: common.ClusterID cluster_id,
4: optional map<common.GraphSpaceID, list<common.PartitionID>> (cpp.template = "std::unordered_map") leader_partIds;
}

struct CreateUserReq {
Expand Down
8 changes: 4 additions & 4 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,10 @@ class NebulaStore : public KVStore, public Handler {

ResultCode flush(GraphSpaceID spaceId) override;

int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) override;

bool isLeader(GraphSpaceID spaceId, PartitionID partId);

ErrorOr<ResultCode, std::shared_ptr<SpacePartInfo>> space(GraphSpaceID spaceId);

/**
* Implement four interfaces in Handler.
* */
Expand All @@ -177,7 +176,8 @@ class NebulaStore : public KVStore, public Handler {

void removePart(GraphSpaceID spaceId, PartitionID partId) override;

ErrorOr<ResultCode, std::shared_ptr<SpacePartInfo>> space(GraphSpaceID spaceId);
int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) override;

private:
void updateSpaceOption(GraphSpaceID spaceId,
Expand Down
9 changes: 9 additions & 0 deletions src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,14 @@ void MetaServerBasedPartManager::onPartUpdated(const PartMeta& partMeta) {
UNUSED(partMeta);
}

void MetaServerBasedPartManager::fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<PartitionID>>& leaderIds) {
if (handler_ != nullptr) {
handler_->allLeader(leaderIds);
} else {
VLOG(1) << "handler_ is nullptr!";
}
}

} // namespace kvstore
} // namespace nebula
5 changes: 5 additions & 0 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Handler {
bool isDbOption) = 0;
virtual void removeSpace(GraphSpaceID spaceId) = 0;
virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0;
virtual int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) = 0;
};


Expand Down Expand Up @@ -164,6 +166,9 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL

void onPartUpdated(const PartMeta& partMeta) override;

void fetchLeaderInfo(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderParts) override;

HostAddr getLocalHost() {
return localHost_;
}
Expand Down
29 changes: 29 additions & 0 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,34 @@ bool ActiveHostsMan::isLived(kvstore::KVStore* kv, const HostAddr& host) {
return std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end();
}

kvstore::ResultCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const int64_t timeInMilliSec) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(),
MetaServiceUtils::lastUpdateTimeVal(timeInMilliSec));

folly::SharedMutex::WriteHolder wHolder(LockUtils::lastUpdateTimeLock());
folly::Baton<true, std::atomic> baton;
kvstore::ResultCode ret;
kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data),
[&] (kvstore::ResultCode code) {
ret = code;
baton.post();
});
baton.wait();
return ret;
}

int64_t LastUpdateTimeMan::get(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
auto key = MetaServiceUtils::lastUpdateTimeKey();
std::string val;
auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val);
if (ret == kvstore::ResultCode::SUCCEEDED) {
return *reinterpret_cast<const int64_t*>(val.data());
}
return 0;
}

} // namespace meta
} // namespace nebula
12 changes: 12 additions & 0 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ class ActiveHostsMan final {
ActiveHostsMan() = default;
};

class LastUpdateTimeMan final {
public:
~LastUpdateTimeMan() = default;

static kvstore::ResultCode update(kvstore::KVStore* kv, const int64_t timeInMilliSec);

static int64_t get(kvstore::KVStore* kv);

protected:
LastUpdateTimeMan() = default;
};

} // namespace meta
} // namespace nebula

Expand Down
28 changes: 26 additions & 2 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,25 @@ const std::string kIndexTable = "__index__"; // NOLINT
const std::string kUsersTable = "__users__"; // NOLINT
const std::string kRolesTable = "__roles__"; // NOLINT
const std::string kConfigsTable = "__configs__"; // NOLINT

const std::string kLastUpdateTimeTable = "__last_update_time__"; // NOLINT

const std::string kHostOnline = "Online"; // NOLINT
const std::string kHostOffline = "Offline"; // NOLINT

std::string MetaServiceUtils::lastUpdateTimeKey() {
std::string key;
key.reserve(128);
key.append(kLastUpdateTimeTable.data(), kLastUpdateTimeTable.size());
return key;
}

std::string MetaServiceUtils::lastUpdateTimeVal(const int64_t timeInMilliSec) {
std::string val;
val.reserve(sizeof(int64_t));
val.append(reinterpret_cast<const char*>(&timeInMilliSec), sizeof(int64_t));
return val;
}

std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(128);
Expand All @@ -39,7 +53,7 @@ std::string MetaServiceUtils::spaceVal(const cpp2::SpaceProperties &properties)
return val;
}

cpp2::SpaceProperties MetaServiceUtils::parseSpace(folly::StringPiece rawData) {
cpp2::SpaceProperties MetaServiceUtils::parseSpace(folly::StringPiece rawData) {
cpp2::SpaceProperties properties;
apache::thrift::CompactSerializer::deserialize(rawData, properties);
return properties;
Expand All @@ -66,6 +80,16 @@ std::string MetaServiceUtils::partKey(GraphSpaceID spaceId, PartitionID partId)
return key;
}

GraphSpaceID MetaServiceUtils::parsePartKeySpaceId(folly::StringPiece key) {
return *reinterpret_cast<const GraphSpaceID*>(key.data() + kPartsTable.size());
}

PartitionID MetaServiceUtils::parsePartKeyPartId(folly::StringPiece key) {
return *reinterpret_cast<const PartitionID*>(key.data()
+ kPartsTable.size()
+ sizeof(GraphSpaceID));
}

std::string MetaServiceUtils::partVal(const std::vector<nebula::cpp2::HostAddr>& hosts) {
std::string val;
val.reserve(128);
Expand Down
9 changes: 9 additions & 0 deletions src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/Base.h"
#include "base/Status.h"
#include "interface/gen-cpp2/meta_types.h"
#include "meta/ActiveHostsMan.h"

namespace nebula {
namespace meta {
Expand All @@ -28,6 +29,10 @@ class MetaServiceUtils final {
public:
MetaServiceUtils() = delete;

static std::string lastUpdateTimeKey();

static std::string lastUpdateTimeVal(const int64_t timeInMilliSec);

static std::string spaceKey(GraphSpaceID spaceId);

static std::string spaceVal(const cpp2::SpaceProperties &properties);
Expand All @@ -42,6 +47,10 @@ class MetaServiceUtils final {

static std::string partKey(GraphSpaceID spaceId, PartitionID partId);

static GraphSpaceID parsePartKeySpaceId(folly::StringPiece key);

static PartitionID parsePartKeyPartId(folly::StringPiece key);

static std::string partVal(const std::vector<nebula::cpp2::HostAddr>& hosts);

static const std::string& partPrefix();
Expand Down
Loading

0 comments on commit a93774d

Please sign in to comment.