Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support flushdb command #238

Merged
merged 3 commits into from
Jan 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ class CompactCmd : public Cmd {
virtual void Do();

private:
std::string struct_type_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
struct_type_.clear();
}
};

class PurgelogstoCmd : public Cmd {
Expand Down Expand Up @@ -142,6 +146,19 @@ class FlushallCmd : public Cmd {
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class FlushdbCmd : public Cmd {
public:
FlushdbCmd() {}
virtual void Do();

private:
std::string db_name_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
db_name_.clear();
}
};

class ReadonlyCmd : public Cmd {
public:
ReadonlyCmd() : is_open_(false) {}
Expand Down
5 changes: 5 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const std::string kCmdNamePurgelogsto = "purgelogsto";
const std::string kCmdNamePing = "ping";
const std::string kCmdNameSelect = "select";
const std::string kCmdNameFlushall = "flushall";
const std::string kCmdNameFlushdb = "flushdb";
const std::string kCmdNameReadonly = "readonly";
const std::string kCmdNameClient = "client";
const std::string kCmdNameShutdown = "shutdown";
Expand Down Expand Up @@ -297,6 +298,7 @@ class CmdRes {
kInvalidParameter,
kWrongNum,
kInvalidIndex,
kInvalidDbType,
kErrOther,
};

Expand Down Expand Up @@ -361,6 +363,9 @@ class CmdRes {
case kInvalidIndex:
result = "-ERR invalid DB index\r\n";
break;
case kInvalidDbType:
result = "-ERR invalid DB type\r\n";
break;
case kErrOther:
result = "-ERR ";
result.append(message_);
Expand Down
8 changes: 7 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ class PikaServer {
*/
bool ServerInit();

/*
* Nemo options init
*/
void NemoOptionInit(nemo::Options* option);

/*
* Binlog
*/
Expand Down Expand Up @@ -358,8 +363,9 @@ class PikaServer {
}
slash::Mutex & GetSlavesMutex() { return db_sync_protector_; }

//flushall
//flushall & flushdb
bool FlushAll();
bool FlushDb(const std::string& db_name);
void PurgeDir(std::string& path);

/*
Expand Down
56 changes: 54 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,35 @@ void BgsaveoffCmd::Do() {
}

void CompactCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
if (!ptr_info->CheckArg(argv.size())) {
if (!ptr_info->CheckArg(argv.size())
|| argv.size() > 2) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameCompact);
return;
}

if (argv.size() == 2) {
struct_type_ = slash::StringToLower(argv[1]);
}
}

void CompactCmd::Do() {
nemo::Status s = g_pika_server->db()->Compact(nemo::kALL);
nemo::Status s;
if (struct_type_.empty()) {
s = g_pika_server->db()->Compact(nemo::kALL);
} else if (struct_type_ == "string") {
s = g_pika_server->db()->Compact(nemo::kKV_DB);
} else if (struct_type_ == "hash") {
s = g_pika_server->db()->Compact(nemo::kHASH_DB);
} else if (struct_type_ == "set") {
s = g_pika_server->db()->Compact(nemo::kSET_DB);
} else if (struct_type_ == "zset") {
s = g_pika_server->db()->Compact(nemo::kZSET_DB);
} else if (struct_type_ == "list") {
s = g_pika_server->db()->Compact(nemo::kLIST_DB);
} else {
res_.SetRes(CmdRes::kInvalidDbType);
return;
}
if (s.ok()) {
res_.SetRes(CmdRes::kOk);
} else {
Expand Down Expand Up @@ -391,6 +412,37 @@ void FlushallCmd::Do() {
g_pika_server->RWUnlock();
}

void FlushdbCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
if (!ptr_info->CheckArg(argv.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameFlushdb);
return;
}
std::string struct_type = slash::StringToLower(argv[1]);
if (struct_type == "string") {
db_name_ = "kv";
} else if (struct_type == "hash") {
db_name_ = "hash";
} else if (struct_type == "set") {
db_name_ = "set";
} else if (struct_type == "zset") {
db_name_ = "zset";
} else if (struct_type == "list") {
db_name_ = "list";
} else {
res_.SetRes(CmdRes::kInvalidDbType);
}
}

void FlushdbCmd::Do() {
g_pika_server->RWLockWriter();
if (g_pika_server->FlushDb(db_name_)) {
res_.SetRes(CmdRes::kOk);
} else {
res_.SetRes(CmdRes::kErrOther, "There are some bgthread using db now, can not flushdb");
}
g_pika_server->RWUnlock();
}

void ReadonlyCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
if (!ptr_info->CheckArg(argv.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameReadonly);
Expand Down
6 changes: 5 additions & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void InitCmdInfoTable() {
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameBgsave, bgsaveptr));
CmdInfo* bgsaveoffptr = new CmdInfo(kCmdNameBgsaveoff, 1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameBgsaveoff, bgsaveoffptr));
CmdInfo* compactptr = new CmdInfo(kCmdNameCompact, 1, kCmdFlagsRead | kCmdFlagsAdmin);
CmdInfo* compactptr = new CmdInfo(kCmdNameCompact, -1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameCompact, compactptr));
CmdInfo* purgelogptr = new CmdInfo(kCmdNamePurgelogsto, 2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePurgelogsto, purgelogptr));
Expand All @@ -45,6 +45,8 @@ void InitCmdInfoTable() {
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSelect, selectptr));
CmdInfo* flushallptr = new CmdInfo(kCmdNameFlushall, 1, kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameFlushall, flushallptr));
CmdInfo* flushdbptr = new CmdInfo(kCmdNameFlushdb, 2, kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameFlushdb, flushdbptr));
CmdInfo* readonlyptr = new CmdInfo(kCmdNameReadonly, 2, kCmdFlagsRead | kCmdFlagsSuspend | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameReadonly, readonlyptr));
CmdInfo* clientptr = new CmdInfo(kCmdNameClient, -2, kCmdFlagsRead | kCmdFlagsAdmin);
Expand Down Expand Up @@ -496,6 +498,8 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSelect, selectptr));
Cmd* flushallptr = new FlushallCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameFlushall, flushallptr));
Cmd* flushdbptr = new FlushdbCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameFlushdb, flushdbptr));
Cmd* readonlyptr = new ReadonlyCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameReadonly, readonlyptr));
Cmd* clientptr = new ClientCmd();
Expand Down
88 changes: 58 additions & 30 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,7 @@ PikaServer::PikaServer() :
}
// Create nemo handle
nemo::Options option;

option.write_buffer_size = g_pika_conf->write_buffer_size();
option.target_file_size_base = g_pika_conf->target_file_size_base();
option.max_background_flushes = g_pika_conf->max_background_flushes();
option.max_background_compactions = g_pika_conf->max_background_compactions();
option.max_open_files = g_pika_conf->max_cache_files();
option.max_bytes_for_level_multiplier = g_pika_conf->max_bytes_for_level_multiplier();
if (g_pika_conf->compression() == "none") {
option.compression = nemo::Options::CompressionType::kNoCompression;
} else if (g_pika_conf->compression() == "snappy") {
option.compression = nemo::Options::CompressionType::kSnappyCompression;
} else if (g_pika_conf->compression() == "zlib") {
option.compression = nemo::Options::CompressionType::kZlibCompression;
}
NemoOptionInit(&option);

std::string db_path = g_pika_conf->db_path();
LOG(INFO) << "Prepare DB...";
Expand Down Expand Up @@ -246,6 +233,22 @@ bool PikaServer::ServerInit() {

}

void PikaServer::NemoOptionInit(nemo::Options* option) {
option->write_buffer_size = g_pika_conf->write_buffer_size();
option->target_file_size_base = g_pika_conf->target_file_size_base();
option->max_background_flushes = g_pika_conf->max_background_flushes();
option->max_background_compactions = g_pika_conf->max_background_compactions();
option->max_open_files = g_pika_conf->max_cache_files();
option->max_bytes_for_level_multiplier = g_pika_conf->max_bytes_for_level_multiplier();
if (g_pika_conf->compression() == "none") {
option->compression = nemo::Options::CompressionType::kNoCompression;
} else if (g_pika_conf->compression() == "snappy") {
option->compression = nemo::Options::CompressionType::kSnappyCompression;
} else if (g_pika_conf->compression() == "zlib") {
option->compression = nemo::Options::CompressionType::kZlibCompression;
}
}

void PikaServer::Start() {
int ret = 0;
ret = pika_dispatch_thread_->StartThread();
Expand Down Expand Up @@ -1471,6 +1474,10 @@ bool PikaServer::FlushAll() {
return false;
}
}

LOG(INFO) << "Delete old db...";
db_.reset();

std::string dbpath = g_pika_conf->db_path();
if (dbpath[dbpath.length() - 1] == '/') {
dbpath.erase(dbpath.length() - 1);
Expand All @@ -1480,23 +1487,8 @@ bool PikaServer::FlushAll() {
dbpath.append("/deleting");
slash::RenameFile(g_pika_conf->db_path(), dbpath.c_str());

LOG(INFO) << "Delete old db...";
db_.reset();

nemo::Options option;
option.write_buffer_size = g_pika_conf->write_buffer_size();
option.target_file_size_base = g_pika_conf->target_file_size_base();
option.max_background_flushes = g_pika_conf->max_background_flushes();
option.max_background_compactions = g_pika_conf->max_background_compactions();
option.max_open_files = g_pika_conf->max_cache_files();
option.max_bytes_for_level_multiplier = g_pika_conf->max_bytes_for_level_multiplier();
if (g_pika_conf->compression() == "none") {
option.compression = nemo::Options::CompressionType::kNoCompression;
} else if (g_pika_conf->compression() == "snappy") {
option.compression = nemo::Options::CompressionType::kSnappyCompression;
} else if (g_pika_conf->compression() == "zlib") {
option.compression = nemo::Options::CompressionType::kZlibCompression;
}
NemoOptionInit(&option);

LOG(INFO) << "Prepare open new db...";
db_ = std::shared_ptr<nemo::Nemo>(new nemo::Nemo(g_pika_conf->db_path(), option));
Expand All @@ -1505,6 +1497,42 @@ bool PikaServer::FlushAll() {
return true;
}

bool PikaServer::FlushDb(const std::string& db_name) {
{
slash::MutexLock l(&bgsave_protector_);
if (bgsave_info_.bgsaving) {
return false;
}
}
{
slash::MutexLock l(&key_scan_protector_);
if (key_scan_info_.key_scaning_) {
return false;
}
}

std::string db_alias = db_name != "kv" ? db_name : "string";
LOG(INFO) << "Delete old " + db_alias + " db...";
db_.reset();

std::string dbpath = g_pika_conf->db_path();
if (dbpath[dbpath.length() - 1] != '/') {
dbpath.append("/");
}
std::string sub_dbpath = dbpath + db_name;
std::string del_dbpath = dbpath + db_alias + "_deleting";
slash::RenameFile(sub_dbpath, del_dbpath);

nemo::Options option;
NemoOptionInit(&option);

LOG(INFO) << "Prepare open new " + db_alias + " db...";
db_ = std::shared_ptr<nemo::Nemo>(new nemo::Nemo(g_pika_conf->db_path(), option));
LOG(INFO) << "open new " + db_alias + " db success";
PurgeDir(del_dbpath);
return true;
}

void PikaServer::PurgeDir(std::string& path) {
std::string *dir_path = new std::string(path);
// Start new thread if needed
Expand Down