Skip to content

Commit

Permalink
add some debug info
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu85cn committed Oct 15, 2021
1 parent e1ff44f commit a432011
Show file tree
Hide file tree
Showing 18 changed files with 261 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(1) << "leader host: " << leader;
VLOG(2) << "leader host: " << leader;

cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
Expand Down
17 changes: 17 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
}
}
raftService_->addPartition(part);
LOG(INFO) << "TransactionManager onNewPartAdded_.size()=" << onNewPartAdded_.size();
for (auto& func : onNewPartAdded_) {
func.second(part);
}
Expand Down Expand Up @@ -1192,5 +1193,21 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> NebulaStore::getProperty(
return folly::toJson(obj);
}

void NebulaStore::registerOnNewPartAdded(
const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts) {
LOG(INFO) << "spaces_.size() = " << spaces_.size();
for (auto& item : spaces_) {
LOG(INFO) << "registerOnNewPartAdded() space = " << item.first;
for (auto& partItem : item.second->parts_) {
LOG(INFO) << "registerOnNewPartAdded() part = " << partItem.first;
existParts.emplace_back(std::make_pair(item.first, partItem.first));
func(partItem.second);
}
}
onNewPartAdded_.insert(std::make_pair(funcName, func));
}

} // namespace kvstore
} // namespace nebula
5 changes: 2 additions & 3 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,8 @@ class NebulaStore : public KVStore, public Handler {
ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(GraphSpaceID spaceId,
const std::string& property) override;
void registerOnNewPartAdded(const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func) {
onNewPartAdded_.insert(std::make_pair(funcName, func));
}
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts);

void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); }

Expand Down
21 changes: 19 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,18 @@ void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) {

void Part::setBlocking(bool sign) { blocking_ = sign; }

void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for the term " << term; }
void Part::onLostLeadership(TermID term) {
VLOG(1) << "Lost the leadership for the term " << term;

CallbackOptions opt;
opt.spaceId = spaceId_;
opt.partId = partId_;
opt.term = term_;

for (auto& cb : leaderLostCB_) {
cb(opt);
}
}

void Part::onElected(TermID term) {
VLOG(1) << "Being elected as the leader for the term: " << term;
Expand All @@ -191,7 +202,9 @@ void Part::onLeaderReady(TermID term) {
}
}

void Part::registerOnLeaderReady(LeaderReadyCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }
void Part::registerOnLeaderReady(LeaderChagneCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }

void Part::registerOnLeaderLost(LeaderChagneCB cb) { leaderLostCB_.emplace_back(std::move(cb)); }

void Part::onDiscoverNewLeader(HostAddr nLeader) {
LOG(INFO) << idStr_ << "Find the new leader " << nLeader;
Expand Down Expand Up @@ -231,6 +244,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
// Make the number of values are an even number
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1]);
auto code = batch->put(kvs[i], kvs[i + 1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
Expand Down Expand Up @@ -272,6 +287,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
case OP_BATCH_WRITE: {
auto data = decodeBatchValue(log);
for (auto& op : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
<< ", val=" << folly::hexlify(op.second.second);
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (op.first == BatchLogType::OP_BATCH_PUT) {
code = batch->put(op.second.first, op.second.second);
Expand Down
9 changes: 6 additions & 3 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,18 @@ class Part : public raftex::RaftPart {
TermID term;
};

using LeaderReadyCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderReadyCB cb);
using LeaderChagneCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderChagneCB cb);

void registerOnLeaderLost(LeaderChagneCB cb);

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
std::string walPath_;
NewLeaderCallback newLeaderCb_ = nullptr;
std::vector<LeaderReadyCB> leaderReadyCB_;
std::vector<LeaderChagneCB> leaderReadyCB_;
std::vector<LeaderChagneCB> leaderLostCB_;

private:
KVEngine* engine_ = nullptr;
Expand Down
84 changes: 73 additions & 11 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/thrift/ThriftClientManager.h"
#include "common/time/WallClock.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/raftex/Host.h"
#include "kvstore/raftex/LogStrListIterator.h"
#include "kvstore/wal/FileBasedWal.h"
Expand Down Expand Up @@ -585,6 +586,7 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
LogType logType,
std::string log,
AtomicOp op) {
std::string debugLog = log;
if (blocking_) {
// No need to block heartbeats and empty log.
if ((logType == LogType::NORMAL && !log.empty()) || logType == LogType::ATOMIC_OP) {
Expand All @@ -602,6 +604,17 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
<< "replicatingLogs_ :" << replicatingLogs_;
return AppendLogResult::E_BUFFER_OVERFLOW;
}
LogID firstId = 0;
TermID termId = 0;
AppendLogResult res;
{
std::lock_guard<std::mutex> g(raftLock_);
res = canAppendLogs();
if (res == AppendLogResult::SUCCEEDED) {
firstId = lastLogId_ + 1;
termId = term_;
}
}
{
std::lock_guard<std::mutex> lck(logsLock_);

Expand Down Expand Up @@ -634,6 +647,30 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
break;
}

if (!debugLog.empty()) {
switch (debugLog[sizeof(int64_t)]) {
case kvstore::OP_MULTI_PUT: {
auto kvs = kvstore::decodeMultiValues(debugLog);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1])
<< " res = " << static_cast<int>(res);
}
break;
}
case kvstore::OP_BATCH_WRITE: {
auto data = kvstore::decodeBatchValue(debugLog);
for (auto& opp : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(opp.second.first)
<< ", val=" << folly::hexlify(opp.second.second);
}
break;
}
default:
break;
}
}

bool expected = false;
if (replicatingLogs_.compare_exchange_strong(expected, true)) {
// We need to send logs to all followers
Expand All @@ -648,17 +685,39 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
}
}

LogID firstId = 0;
TermID termId = 0;
AppendLogResult res;
{
std::lock_guard<std::mutex> g(raftLock_);
res = canAppendLogs();
if (res == AppendLogResult::SUCCEEDED) {
firstId = lastLogId_ + 1;
termId = term_;
}
}
// LogID firstId = 0;
// TermID termId = 0;
// AppendLogResult res;
// {
// std::lock_guard<std::mutex> g(raftLock_);
// res = canAppendLogs();
// if (res == AppendLogResult::SUCCEEDED) {
// firstId = lastLogId_ + 1;
// termId = term_;
// }
// }

// if (!debugLog.empty()) {
// switch (debugLog[sizeof(int64_t)]) {
// case kvstore::OP_MULTI_PUT: {
// auto kvs = kvstore::decodeMultiValues(debugLog);
// for (size_t i = 0; i < kvs.size(); i += 2) {
// VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
// << " res = " << static_cast<int>(res);
// }
// break;
// }
// case kvstore::OP_BATCH_WRITE: {
// auto data = kvstore::decodeBatchValue(debugLog);
// for (auto& opp : data) {
// VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(opp.second.first);
// }
// break;
// }
// default:
// break;
// }
// }

if (!checkAppendLogResult(res)) {
// Mosy likely failed because the parttion is not leader
Expand Down Expand Up @@ -1335,6 +1394,9 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req,
<< " i did not commit when i was leader, rollback to " << lastLogId_;
wal_->rollbackToLog(lastLogId_);
}
if (role_ == Role::LEADER) {
bgWorkers_->addTask([self = shared_from_this(), term] { self->onLostLeadership(term); });
}
role_ = Role::FOLLOWER;
votedAddr_ = candidate;
proposedTerm_ = req.get_term();
Expand Down
2 changes: 2 additions & 0 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ void AddEdgesProcessor::doProcess(const cpp2::AddEdgesRequest& req) {
code = writeResultTo(wRet, true);
break;
} else {
LOG(INFO) << "doProcess() key=" << folly::hexlify(key)
<< ", val=" << folly::hexlify(retEnc.value());
data.emplace_back(std::move(key), std::move(retEnc.value()));
}
}
Expand Down
Loading

0 comments on commit a432011

Please sign in to comment.