From 203f4c2cce88a5de91668a3da969e2afc0288efa Mon Sep 17 00:00:00 2001 From: liuyu <52276794+liuyu85cn@users.noreply.github.com> Date: Fri, 15 Oct 2021 17:38:49 +0800 Subject: [PATCH] accumulate bug fix for TOSS --- src/kvstore/NebulaStore.cpp | 4 - src/kvstore/raftex/RaftPart.cpp | 80 +++---------------- src/storage/mutate/AddEdgesProcessor.cpp | 2 - .../ChainAddEdgesProcessorLocal.cpp | 2 - .../transaction/ChainAddEdgesProcessorLocal.h | 2 +- .../transaction/TransactionManager.cpp | 3 - src/storage/transaction/TransactionManager.h | 2 - 7 files changed, 12 insertions(+), 83 deletions(-) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index ff926a5c2b6..250e263cb22 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -374,7 +374,6 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, } } raftService_->addPartition(part); - LOG(INFO) << "TransactionManager onNewPartAdded_.size()=" << onNewPartAdded_.size(); for (auto& func : onNewPartAdded_) { func.second(part); } @@ -1197,11 +1196,8 @@ void NebulaStore::registerOnNewPartAdded( const std::string& funcName, std::function&)> func, std::vector>& existParts) { - LOG(INFO) << "spaces_.size() = " << spaces_.size(); for (auto& item : spaces_) { - LOG(INFO) << "registerOnNewPartAdded() space = " << item.first; for (auto& partItem : item.second->parts_) { - LOG(INFO) << "registerOnNewPartAdded() part = " << partItem.first; existParts.emplace_back(std::make_pair(item.first, partItem.first)); func(partItem.second); } diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index e426f290c79..5228d6dbd7a 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -586,7 +586,6 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, LogType logType, std::string log, AtomicOp op) { - std::string debugLog = log; if (blocking_) { // No need to block heartbeats and empty log. if ((logType == LogType::NORMAL && !log.empty()) || logType == LogType::ATOMIC_OP) { @@ -604,17 +603,6 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, << "replicatingLogs_ :" << replicatingLogs_; return AppendLogResult::E_BUFFER_OVERFLOW; } - LogID firstId = 0; - TermID termId = 0; - AppendLogResult res; - { - std::lock_guard g(raftLock_); - res = canAppendLogs(); - if (res == AppendLogResult::SUCCEEDED) { - firstId = lastLogId_ + 1; - termId = term_; - } - } { std::lock_guard lck(logsLock_); @@ -647,30 +635,6 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, break; } - if (!debugLog.empty()) { - switch (debugLog[sizeof(int64_t)]) { - case kvstore::OP_MULTI_PUT: { - auto kvs = kvstore::decodeMultiValues(debugLog); - for (size_t i = 0; i < kvs.size(); i += 2) { - VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i]) - << ", val = " << folly::hexlify(kvs[i + 1]) - << " res = " << static_cast(res); - } - break; - } - case kvstore::OP_BATCH_WRITE: { - auto data = kvstore::decodeBatchValue(debugLog); - for (auto& opp : data) { - VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(opp.second.first) - << ", val=" << folly::hexlify(opp.second.second); - } - break; - } - default: - break; - } - } - bool expected = false; if (replicatingLogs_.compare_exchange_strong(expected, true)) { // We need to send logs to all followers @@ -685,39 +649,17 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, } } - // LogID firstId = 0; - // TermID termId = 0; - // AppendLogResult res; - // { - // std::lock_guard g(raftLock_); - // res = canAppendLogs(); - // if (res == AppendLogResult::SUCCEEDED) { - // firstId = lastLogId_ + 1; - // termId = term_; - // } - // } - - // if (!debugLog.empty()) { - // switch (debugLog[sizeof(int64_t)]) { - // case kvstore::OP_MULTI_PUT: { - // auto kvs = kvstore::decodeMultiValues(debugLog); - // for (size_t i = 0; i < kvs.size(); i += 2) { - // VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i]) - // << " res = " << static_cast(res); - // } - // break; - // } - // case kvstore::OP_BATCH_WRITE: { - // auto data = kvstore::decodeBatchValue(debugLog); - // for (auto& opp : data) { - // VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(opp.second.first); - // } - // break; - // } - // default: - // break; - // } - // } + LogID firstId = 0; + TermID termId = 0; + AppendLogResult res; + { + std::lock_guard g(raftLock_); + res = canAppendLogs(); + if (res == AppendLogResult::SUCCEEDED) { + firstId = lastLogId_ + 1; + termId = term_; + } + } if (!checkAppendLogResult(res)) { // Mosy likely failed because the parttion is not leader diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 4bda3990845..2d0b29f5916 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -124,8 +124,6 @@ void AddEdgesProcessor::doProcess(const cpp2::AddEdgesRequest& req) { code = writeResultTo(wRet, true); break; } else { - LOG(INFO) << "doProcess() key=" << folly::hexlify(key) - << ", val=" << folly::hexlify(retEnc.value()); data.emplace_back(std::move(key), std::move(retEnc.value())); } } diff --git a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp index f7e01efe66c..9c3cef073e8 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp +++ b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp @@ -306,7 +306,6 @@ std::vector ChainAddEdgesProcessorLocal::makeDoublePrime() { void ChainAddEdgesProcessorLocal::erasePrime() { auto fn = [&](const cpp2::NewEdge& edge) { auto key = ConsistUtil::primeKey(spaceVidLen_, localPartId_, edge.get_key()); - // VLOG(1) << uuid_ << "prepare to erase prime " << folly::hexlify(key); return key; }; for (auto& edge : req_.get_parts().begin()->second) { @@ -317,7 +316,6 @@ void ChainAddEdgesProcessorLocal::erasePrime() { void ChainAddEdgesProcessorLocal::eraseDoublePrime() { auto fn = [&](const cpp2::NewEdge& edge) { auto key = ConsistUtil::doublePrime(spaceVidLen_, localPartId_, edge.get_key()); - // VLOG(1) << uuid_ << "prepare to erase double prime " << folly::hexlify(key); return key; }; for (auto& edge : req_.get_parts().begin()->second) { diff --git a/src/storage/transaction/ChainAddEdgesProcessorLocal.h b/src/storage/transaction/ChainAddEdgesProcessorLocal.h index b2f7dff56ee..56dc2873972 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorLocal.h +++ b/src/storage/transaction/ChainAddEdgesProcessorLocal.h @@ -133,7 +133,7 @@ class ChainAddEdgesProcessorLocal : public BaseProcessor, cpp2::AddEdgesRequest req_; std::unique_ptr lk_{nullptr}; int retryLimit_{10}; - // + // need to restrict all the phase in the same term. TermID restrictTerm_{-1}; // set to true when prime insert succeed // in processLocal(), we check this to determine if need to do abort() diff --git a/src/storage/transaction/TransactionManager.cpp b/src/storage/transaction/TransactionManager.cpp index a347e7ce080..d23ab910df3 100644 --- a/src/storage/transaction/TransactionManager.cpp +++ b/src/storage/transaction/TransactionManager.cpp @@ -27,7 +27,6 @@ TransactionManager::TransactionManager(StorageEnv* env) : env_(env) { exec_ = std::make_shared(10); iClient_ = env_->interClient_; resumeThread_ = std::make_unique(); - // scanAll(); std::vector> existParts; auto fn = std::bind(&TransactionManager::onNewPartAdded, this, std::placeholders::_1); static_cast<::nebula::kvstore::NebulaStore*>(env_->kvstore_) @@ -42,9 +41,7 @@ TransactionManager::LockCore* TransactionManager::getLockCore(GraphSpaceID space bool checkWhiteList) { if (checkWhiteList) { if (whiteListParts_.find(std::make_pair(spaceId, partId)) == whiteListParts_.end()) { - // LOG(INFO) << folly::sformat("space {}, part {} not in white list", spaceId, partId); return nullptr; - // scanPrimes(spaceId, partId); } } auto it = memLocks_.find(spaceId); diff --git a/src/storage/transaction/TransactionManager.h b/src/storage/transaction/TransactionManager.h index 3a3b4f284a2..7e8ff8f53db 100644 --- a/src/storage/transaction/TransactionManager.h +++ b/src/storage/transaction/TransactionManager.h @@ -115,8 +115,6 @@ class TransactionManager { * @brief only part in this white list allowed to get lock */ folly::ConcurrentHashMap, int> whiteListParts_; - // std::mutex partWhiteListMu_; - // std::map, int64_t> partWhiteList_; }; } // namespace storage