Skip to content

Commit

Permalink
Add http interfaces for flush and compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Jul 25, 2019
1 parent 7017410 commit ee8bef3
Show file tree
Hide file tree
Showing 19 changed files with 411 additions and 63 deletions.
12 changes: 7 additions & 5 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "storage/StorageServiceHandler.h"
#include "storage/StorageHttpStatusHandler.h"
#include "storage/StorageHttpDownloadHandler.h"
#include "storage/StorageHttpAdminHandler.h"
#include "kvstore/NebulaStore.h"
#include "kvstore/PartManager.h"
#include "process/ProcessUtils.h"
Expand Down Expand Up @@ -168,22 +169,23 @@ int main(int argc, char *argv[]) {
ioThreadPool,
metaClient.get(),
schemaMan.get());
auto *kvstore_ = kvstore.get();

std::unique_ptr<nebula::hdfs::HdfsHelper> helper =
std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto *helperPtr = helper.get();
auto* helperPtr = helper.get();

LOG(INFO) << "Starting Storage HTTP Service";
nebula::WebService::registerHandler("/status", [] {
return new nebula::storage::StorageHttpStatusHandler();
});
nebula::WebService::registerHandler("/download", [helperPtr] {
auto handler = new nebula::storage::StorageHttpDownloadHandler();
auto* handler = new nebula::storage::StorageHttpDownloadHandler();
handler->init(helperPtr);
return handler;
});

nebula::WebService::registerHandler("/admin", [&] {
return new nebula::storage::StorageHttpAdminHandler(schemaMan.get(), kvstore.get());
});
status = nebula::WebService::start();
if (!status.ok()) {
return EXIT_FAILURE;
Expand All @@ -192,7 +194,7 @@ int main(int argc, char *argv[]) {
// Setup the signal handlers
setupSignalHandler();

auto handler = std::make_shared<StorageServiceHandler>(kvstore_, schemaMan.get());
auto handler = std::make_shared<StorageServiceHandler>(kvstore.get(), schemaMan.get());
try {
LOG(INFO) << "The storage deamon start on " << localhost;
gServer = std::make_unique<apache::thrift::ThriftServer>();
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum ResultCode {
ERR_LEADER_CHANGED = -5,
ERR_INVALID_ARGUMENT = -6,
ERR_IO_ERROR = -7,
ERR_UNSUPPORTED = -8,
ERR_UNKNOWN = -100,
};

Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ class KVEngine {
virtual ResultCode setDBOption(const std::string& configKey,
const std::string& configValue) = 0;

virtual ResultCode compactAll() = 0;
virtual ResultCode compact() = 0;

virtual ResultCode flush() = 0;

protected:
GraphSpaceID spaceId_;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ class KVStore {
virtual ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID spaceId,
PartitionID partId) = 0;

virtual ResultCode compact(GraphSpaceID spaceId) = 0;

virtual ResultCode flush(GraphSpaceID spaceId) = 0;

protected:
KVStore() = default;
};
Expand Down
104 changes: 62 additions & 42 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,6 @@ DEFINE_int32(num_workers, 4, "Number of worker threads");
return; \
}

/**
* Check spaceId is exist and return related partitions.
*/
#define RETURN_IF_SPACE_NOT_FOUND(spaceId, it) \
it = spaces_.find(spaceId); \
do { \
if (UNLIKELY(it == spaces_.end())) { \
return ResultCode::ERR_SPACE_NOT_FOUND; \
} \
} while (false)

/**
* Check result and return code when it's unsuccess.
* */
#define RETURN_ON_FAILURE(code) \
if (code != ResultCode::SUCCEEDED) { \
return code; \
}


namespace nebula {
namespace kvstore {

Expand Down Expand Up @@ -424,10 +404,12 @@ ErrorOr<ResultCode, std::shared_ptr<Part>> NebulaStore::part(GraphSpaceID spaceI
ResultCode NebulaStore::ingest(GraphSpaceID spaceId,
const std::string& extra,
const std::vector<std::string>& files) {
decltype(spaces_)::iterator it;
folly::RWSpinLock::ReadHolder rh(&lock_);
RETURN_IF_SPACE_NOT_FOUND(spaceId, it);
for (auto& engine : it->second->engines_) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
return error(spaceRet);
}
auto space = nebula::value(spaceRet);
for (auto& engine : space->engines_) {
auto parts = engine->allParts();
std::vector<std::string> extras;
for (auto part : parts) {
Expand All @@ -442,7 +424,9 @@ ResultCode NebulaStore::ingest(GraphSpaceID spaceId,
}
}
auto code = engine->ingest(std::move(extras));
RETURN_ON_FAILURE(code);
if (code != ResultCode::SUCCEEDED) {
return code;
}
}
return ResultCode::SUCCEEDED;
}
Expand All @@ -451,12 +435,16 @@ ResultCode NebulaStore::ingest(GraphSpaceID spaceId,
ResultCode NebulaStore::setOption(GraphSpaceID spaceId,
const std::string& configKey,
const std::string& configValue) {
decltype(spaces_)::iterator it;
folly::RWSpinLock::ReadHolder rh(&lock_);
RETURN_IF_SPACE_NOT_FOUND(spaceId, it);
for (auto& engine : it->second->engines_) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
return error(spaceRet);
}
auto space = nebula::value(spaceRet);
for (auto& engine : space->engines_) {
auto code = engine->setOption(configKey, configValue);
RETURN_ON_FAILURE(code);
if (code != ResultCode::SUCCEEDED) {
return code;
}
}
return ResultCode::SUCCEEDED;
}
Expand All @@ -465,24 +453,47 @@ ResultCode NebulaStore::setOption(GraphSpaceID spaceId,
ResultCode NebulaStore::setDBOption(GraphSpaceID spaceId,
const std::string& configKey,
const std::string& configValue) {
decltype(spaces_)::iterator it;
folly::RWSpinLock::ReadHolder rh(&lock_);
RETURN_IF_SPACE_NOT_FOUND(spaceId, it);
for (auto& engine : it->second->engines_) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
return error(spaceRet);
}
auto space = nebula::value(spaceRet);
for (auto& engine : space->engines_) {
auto code = engine->setDBOption(configKey, configValue);
RETURN_ON_FAILURE(code);
if (code != ResultCode::SUCCEEDED) {
return code;
}
}
return ResultCode::SUCCEEDED;
}


ResultCode NebulaStore::compactAll(GraphSpaceID spaceId) {
decltype(spaces_)::iterator it;
folly::RWSpinLock::ReadHolder rh(&lock_);
RETURN_IF_SPACE_NOT_FOUND(spaceId, it);
for (auto& engine : it->second->engines_) {
auto code = engine->compactAll();
RETURN_ON_FAILURE(code);
ResultCode NebulaStore::compact(GraphSpaceID spaceId) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
return error(spaceRet);
}
auto space = nebula::value(spaceRet);
for (auto& engine : space->engines_) {
auto code = engine->compact();
if (code != ResultCode::SUCCEEDED) {
return code;
}
}
return ResultCode::SUCCEEDED;
}

ResultCode NebulaStore::flush(GraphSpaceID spaceId) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
return error(spaceRet);
}
auto space = nebula::value(spaceRet);
for (auto& engine : space->engines_) {
auto code = engine->flush();
if (code != ResultCode::SUCCEEDED) {
return code;
}
}
return ResultCode::SUCCEEDED;
}
Expand Down Expand Up @@ -515,6 +526,15 @@ ErrorOr<ResultCode, KVEngine*> NebulaStore::engine(GraphSpaceID spaceId, Partiti
return partIt->second->engine();
}

ErrorOr<ResultCode, std::shared_ptr<SpacePartInfo>> NebulaStore::space(GraphSpaceID spaceId) {
folly::RWSpinLock::ReadHolder rh(&lock_);
auto it = spaces_.find(spaceId);
if (UNLIKELY(it == spaces_.end())) {
return ResultCode::ERR_SPACE_NOT_FOUND;
}
return it->second;
}

} // namespace kvstore
} // namespace nebula

8 changes: 6 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ class NebulaStore : public KVStore, public Handler {
const std::string& configKey,
const std::string& configValue);

ResultCode compactAll(GraphSpaceID spaceId);
ResultCode compact(GraphSpaceID spaceId) override;

ResultCode flush(GraphSpaceID spaceId) override;

bool isLeader(GraphSpaceID spaceId, PartitionID partId);

Expand All @@ -178,10 +180,12 @@ class NebulaStore : public KVStore, public Handler {

ErrorOr<ResultCode, KVEngine*> engine(GraphSpaceID spaceId, PartitionID partId);

ErrorOr<ResultCode, std::shared_ptr<SpacePartInfo>> space(GraphSpaceID spaceId);

private:
// The lock used to protect spaces_
folly::RWSpinLock lock_;
std::unordered_map<GraphSpaceID, std::unique_ptr<SpacePartInfo>> spaces_;
std::unordered_map<GraphSpaceID, std::shared_ptr<SpacePartInfo>> spaces_;

std::shared_ptr<folly::IOThreadPoolExecutor> ioPool_;
std::shared_ptr<thread::GenericThreadPool> workers_;
Expand Down
13 changes: 12 additions & 1 deletion src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ ResultCode RocksEngine::setDBOption(const std::string& configKey,
}


ResultCode RocksEngine::compactAll() {
ResultCode RocksEngine::compact() {
rocksdb::CompactRangeOptions options;
rocksdb::Status status = db_->CompactRange(options, nullptr, nullptr);
if (status.ok()) {
Expand All @@ -413,5 +413,16 @@ ResultCode RocksEngine::compactAll() {
}
}

ResultCode RocksEngine::flush() {
rocksdb::FlushOptions options;
rocksdb::Status status = db_->Flush(options);
if (status.ok()) {
return ResultCode::SUCCEEDED;
} else {
LOG(ERROR) << "Flush Failed: " << status.ToString();
return ResultCode::ERR_UNKNOWN;
}
}

} // namespace kvstore
} // namespace nebula
4 changes: 3 additions & 1 deletion src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ class RocksEngine : public KVEngine {
ResultCode setDBOption(const std::string& configKey,
const std::string& configValue) override;

ResultCode compactAll() override;
ResultCode compact() override;

ResultCode flush() override;

private:
std::string partKey(PartitionID partId);
Expand Down
10 changes: 9 additions & 1 deletion src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,15 @@ class HBaseStore : public KVStore {

ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID,
PartitionID) override {
LOG(FATAL) << "Unsupported!";
return ResultCode::ERR_UNSUPPORTED;
}

ResultCode compact(GraphSpaceID) override {
return ResultCode::ERR_UNSUPPORTED;
}

ResultCode flush(GraphSpaceID) override {
return ResultCode::ERR_UNSUPPORTED;
}

private:
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/test/RocksEngineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ TEST(RocksEngineTest, CompactTest) {
folly::stringPrintf("value_%d", i));
}
EXPECT_EQ(ResultCode::SUCCEEDED, engine->multiPut(std::move(data)));
EXPECT_EQ(ResultCode::SUCCEEDED, engine->compactAll());
EXPECT_EQ(ResultCode::SUCCEEDED, engine->compact());
}

} // namespace kvstore
Expand Down
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ nebula_add_library(
storage_http_handler OBJECT
StorageHttpStatusHandler.cpp
StorageHttpDownloadHandler.cpp
StorageHttpAdminHandler.cpp
)

nebula_add_library(
Expand Down
Loading

0 comments on commit ee8bef3

Please sign in to comment.