Skip to content

Commit

Permalink
feat(server): Implemented periodic snapshotting (dragonflydb#161)
Browse files Browse the repository at this point in the history
* feat(test): Added the ability to specify dragonfly cli parameters on a test basis (dragonflydb#199)

Signed-off-by: Braydn <braydn.moore@uwaterloo.ca>
  • Loading branch information
Braydn authored and braydnm committed Aug 21, 2022
1 parent c49e888 commit 2031540
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 25 deletions.
1 change: 1 addition & 0 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rd
testdata/redis6_stream.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
cxx_test(snapshot_test dragonfly_lib LABELS DFLY)


add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
Expand Down
133 changes: 133 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <mimalloc-types.h>
#include <sys/resource.h>

#include <chrono>
#include <filesystem>

extern "C" {
Expand Down Expand Up @@ -47,6 +48,8 @@ using namespace std;
ABSL_FLAG(string, dir, "", "working directory");
ABSL_FLAG(string, dbfilename, "dump", "the filename to save/load the DB");
ABSL_FLAG(string, requirepass, "", "password for AUTH authentication");
ABSL_FLAG(string, save_schedule, "", "glob spec for the time to save a snapshot which matches HH:MM 24h time");
ABSL_FLAG(bool, save_schedule_use_utc, false, "use UTC when specifying the time to save a snapshot");

ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
Expand Down Expand Up @@ -147,6 +150,78 @@ class LinuxWriteWrapper : public io::WriteFile {

} // namespace

bool IsValidSaveScheduleNibble(string_view time, unsigned int max, unsigned int min_len = 2) {
size_t digit_mask = std::pow(10, time.size() - 1);
for (size_t i = 0; i < time.length(); ++i, digit_mask /= 10) {
// ignore wildcards as they are always valid for their placeholder
if (time[i] == '*') continue;
// if it is not a number and not a '*' invalid string
if (time[i] < '0' || time[i] > '9') return false;
// get the expected digits from both items
unsigned int digit = max / digit_mask;
unsigned int time_digit = time[i] - '0';
// the validation only needs to continue as long as digit == time_digit
// take for example max 24, if time is 1x any x will still be less than the max
if (digit > time_digit) return true;
if (digit < time_digit) return false;

max = max % digit_mask;
}

return true;
}

bool IsValidSaveSchedule(string_view time) {
if (time.length() < 3 || time.length() > 5) return false;

size_t separator_idx = 0;
while (separator_idx < 3 && time[separator_idx] != ':') ++separator_idx;

// the time cannot start with ':' and it must be present in the first 3 characters of any time
if (separator_idx == 3 || separator_idx == 0) return false;

auto hour_view = string_view(time.data(), separator_idx);
auto min_view = string_view(time.data() + separator_idx + 1, time.length() - separator_idx - 1);

// any hour is >= 0 and <= 23
if (hour_view.length() < 1 || hour_view.length() > 2) return false;
// a minute should be 2 digits as it is zero padded, unless it is a '*' in which case this greedily can
// make up both digits
if ((min_view.length() < 2 && min_view != "*") || min_view.length() > 2) return false;

return IsValidSaveScheduleNibble(hour_view, 23, 1)
&& IsValidSaveScheduleNibble(min_view, 59, 2);
}

bool DoesTimeMatchSpecifier(string_view::const_reverse_iterator begin, string_view::const_reverse_iterator end, unsigned int time) {
// single greedy wildcard matches everything
if (*begin == '*' && begin + 1 == end) return true;
// otherwise start from the least significant digit of the string
// check if the current digit in the matcher is a wildcard or if it matches the digit specified
while (begin < end) {
if (*begin != '*' && *begin != '0' + (time % 10)) return false;
++begin;
time /= 10;
}

return true;
}

bool DoesTimeMatchSpecifier(string_view time, unsigned int hour, unsigned int min) {
std::string_view::const_iterator it;
for (it = time.begin(); it != time.end() && *it != ':'; ++it);
if (!DoesTimeMatchSpecifier(std::make_reverse_iterator(it), time.rend(), hour)) {
return false;
}

++it;
if (!DoesTimeMatchSpecifier(time.rbegin(), std::make_reverse_iterator(it), min)) {
return false;
}

return true;
}

ServerFamily::ServerFamily(Service* service) : service_(*service) {
start_time_ = time(NULL);
lsinfo_ = make_shared<LastSaveInfo>();
Expand Down Expand Up @@ -199,6 +274,19 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
if (!load_path.empty()) {
Load(load_path);
}

string save_time = GetFlag(FLAGS_save_schedule);
if (!save_time.empty() && IsValidSaveSchedule(save_time)) {
snapshot_fiber_ = service_.proactor_pool().GetNextProactor()->LaunchFiber([save_time = std::move(save_time), this] {
SnapshotScheduling(std::move(save_time));
});
}
// if the argument is not empty it is an invalid format so print a warning
else if (!save_time.empty()) {
LOG(WARNING)<<"Invalid snapshot time specifier "<<save_time;
}

is_running_ = true;
}

void ServerFamily::Shutdown() {
Expand All @@ -207,6 +295,12 @@ void ServerFamily::Shutdown() {
if (load_fiber_.joinable())
load_fiber_.join();

is_running_ = false;
if (snapshot_fiber_.joinable()) {
snapshot_timer_cv_.notify_all();
snapshot_fiber_.join();
}

pb_task_->Await([this] {
pb_task_->CancelPeriodic(stats_caching_task_);
stats_caching_task_ = 0;
Expand Down Expand Up @@ -264,6 +358,45 @@ void ServerFamily::Load(const std::string& load_path) {
});
}

void ServerFamily::SnapshotScheduling(const string &&time) {
auto timezone = GetFlag(FLAGS_save_schedule_use_utc) ? absl::UTCTimeZone() : absl::LocalTimeZone();
while (is_running_) {
std::unique_lock lk(snapshot_mu_);
snapshot_timer_cv_.wait_for(lk, std::chrono::seconds(20));

absl::Time now = absl::Now();
absl::TimeZone::CivilInfo tz = timezone.At(now);

if (!DoesTimeMatchSpecifier(time, tz.cs.hour(), tz.cs.minute())) {
continue;
}

// if it matches check the last save time, if it is the same minute don't save another snapshot
time_t last_save;
{
lock_guard lk(save_mu_);
last_save = lsinfo_->save_time;
}

if ((last_save / 60) == (absl::ToTimeT(now) / 60)) {
continue;
}

// do the save
string err_details;
error_code ec;
const CommandId* cid = service().FindCmd("SAVE");
CHECK_NOTNULL(cid);
boost::intrusive_ptr<Transaction> trans(new Transaction{cid});
trans->InitByArgs(0, {});
VLOG(1) << "Performing snapshot";
ec = DoSave(trans.get(), &err_details);
if (ec) {
VLOG(1) << "Failed to perform snapshot "<<err_details;
}
}
}

error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
io::ReadonlyFileOrError res = uring::OpenRead(rdb_file);
error_code ec;
Expand Down
9 changes: 7 additions & 2 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include <atomic>
#include "facade/conn_context.h"
#include "facade/redis_parser.h"
#include "server/engine_shard_set.h"
Expand Down Expand Up @@ -117,7 +118,9 @@ class ServerFamily {

void Load(const std::string& file_name);

boost::fibers::fiber load_fiber_;
void SnapshotScheduling(const std::string &&time);

boost::fibers::fiber load_fiber_, snapshot_fiber_;

uint32_t stats_caching_task_ = 0;
Service& service_;
Expand All @@ -126,7 +129,8 @@ class ServerFamily {
util::ListenerInterface* main_listener_ = nullptr;
util::ProactorBase* pb_task_ = nullptr;

mutable ::boost::fibers::mutex replicaof_mu_, save_mu_;
mutable ::boost::fibers::mutex replicaof_mu_, save_mu_, snapshot_mu_;
::boost::fibers::condition_variable snapshot_timer_cv_;
std::shared_ptr<Replica> replica_; // protected by replica_of_mu_

std::unique_ptr<ScriptMgr> script_mgr_;
Expand All @@ -137,6 +141,7 @@ class ServerFamily {

std::shared_ptr<LastSaveInfo> lsinfo_; // protected by save_mu_;
std::atomic_bool is_saving_{false};
std::atomic_bool is_running_{false};
};

} // namespace dfly
106 changes: 106 additions & 0 deletions src/server/snapshot_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include <gtest/gtest.h>
#include <gmock/gmock.h>

#include "base/gtest.h"
#include "server/test_utils.h"

using namespace testing;
using namespace std;
using namespace util;
using namespace facade;
using absl::StrCat;

namespace dfly {

class SnapshotTest : public Test {
protected:
};

bool IsValidSaveSchedule(string_view time);
bool DoesTimeMatchSpecifier(string_view time, unsigned int hour, unsigned int min);

TEST_F(SnapshotTest, InvalidTimes) {
EXPECT_FALSE(IsValidSaveSchedule("24:00"));
EXPECT_FALSE(IsValidSaveSchedule("00:60"));
EXPECT_FALSE(IsValidSaveSchedule("100:00"));
EXPECT_FALSE(IsValidSaveSchedule("00:100"));

// invalid times with regex
EXPECT_FALSE(IsValidSaveSchedule("23:6*"));

// Minutes must be zero padded
EXPECT_FALSE(IsValidSaveSchedule("00:9"));

// No separators or start with separator
EXPECT_FALSE(IsValidSaveSchedule(":12"));
EXPECT_FALSE(IsValidSaveSchedule("1234"));
EXPECT_FALSE(IsValidSaveSchedule("1"));

// Negative numbers / non numeric characters
EXPECT_FALSE(IsValidSaveSchedule("-1:-2"));
EXPECT_FALSE(IsValidSaveSchedule("12:34b"));

// Wildcards for full times
EXPECT_FALSE(IsValidSaveSchedule("12*:09"));
EXPECT_FALSE(IsValidSaveSchedule("23:45*"));
}

TEST_F(SnapshotTest, ValidTimes) {
// Test endpoints
EXPECT_TRUE(IsValidSaveSchedule("23:59"));
EXPECT_TRUE(IsValidSaveSchedule("00:00"));
// hours don't need to be zero padded
EXPECT_TRUE(IsValidSaveSchedule("0:00"));

// wildcard checks
EXPECT_TRUE(IsValidSaveSchedule("1*:09"));
EXPECT_TRUE(IsValidSaveSchedule("*9:23"));
EXPECT_TRUE(IsValidSaveSchedule("23:*1"));
EXPECT_TRUE(IsValidSaveSchedule("18:1*"));

// Greedy wildcards
EXPECT_TRUE(IsValidSaveSchedule("*:12"));
EXPECT_TRUE(IsValidSaveSchedule("9:*"));
EXPECT_TRUE(IsValidSaveSchedule("09:*"));
EXPECT_TRUE(IsValidSaveSchedule("*:*"));
}

TEST_F(SnapshotTest, TimeMatches) {
EXPECT_TRUE(DoesTimeMatchSpecifier("12:34", 12, 34));
EXPECT_TRUE(DoesTimeMatchSpecifier("2:34", 2, 34));
EXPECT_TRUE(DoesTimeMatchSpecifier("2:04", 2, 4));

EXPECT_FALSE(DoesTimeMatchSpecifier("12:34", 2, 4));
EXPECT_FALSE(DoesTimeMatchSpecifier("2:34", 3, 34));
EXPECT_FALSE(DoesTimeMatchSpecifier("2:04", 3, 5));

// Check wildcard for one slot
for (int i = 0; i < 9; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("1*:34", 10 + i, 34));

EXPECT_TRUE(DoesTimeMatchSpecifier("*3:04", 13, 4));
EXPECT_TRUE(DoesTimeMatchSpecifier("*3:04", 23, 4));

// do the same checks for the minutes
for (int i = 0; i < 9; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("10:3*", 10, 30 + i));

for (int i = 0; i < 6; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("13:*4", 13, (10 * i) + 4));

// check greedy wildcards
for (int i = 0; i < 24; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("*:12", i, 12));

for (int i = 0; i < 60; ++i)
EXPECT_TRUE(DoesTimeMatchSpecifier("3:*", 3, i));

for (int i = 0; i < 24; ++i)
for (int j = 0; j < 60; ++j)
EXPECT_TRUE(DoesTimeMatchSpecifier("*:*", i, j));
}

} // namespace dfly
25 changes: 25 additions & 0 deletions tests/pytest/snapshot_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from test_dragonfly import DRAGONFLY_PATH, dfly_args, df_server, client, connection

import redis
from pathlib import Path
import time
import pytest

OUT_DIR = Path(DRAGONFLY_PATH).parent

@pytest.mark.usefixtures("client")
@dfly_args("--dbfilename", "test.rdb",
"--save_schedule", "*:*",
"--dir", str(OUT_DIR)+"/")
class TestSnapshot:
def test_snapshot(self, client: redis.Redis):
out_path = OUT_DIR / "test.rdb"
client.set("test", "test")

if out_path.exists():
out_path.unlink()

time.sleep(60)

assert out_path.exists()

Loading

0 comments on commit 2031540

Please sign in to comment.