Skip to content

Commit

Permalink
Backup async replication (ydb-platform#14441)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Feb 11, 2025
1 parent 7a4c2f7 commit 5a8b1e9
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 1 deletion.
110 changes: 110 additions & 0 deletions ydb/library/backup/backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
#include "util.h"

#include <ydb-cpp-sdk/client/cms/cms.h>
#include <ydb-cpp-sdk/client/draft/ydb_replication.h>
#include <ydb-cpp-sdk/client/draft/ydb_view.h>
#include <ydb-cpp-sdk/client/driver/driver.h>
#include <ydb-cpp-sdk/client/proto/accessor.h>
#include <ydb-cpp-sdk/client/result/result.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/topic/client.h>
#include <ydb-cpp-sdk/client/value/value.h>
#include <ydb/public/api/protos/draft/ydb_replication.pb.h>
#include <ydb/public/api/protos/draft/ydb_view.pb.h>
#include <ydb/public/api/protos/ydb_cms.pb.h>
#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
Expand Down Expand Up @@ -52,6 +54,7 @@

#include <google/protobuf/text_format.h>

#include <format>

namespace NYdb::NBackup {

Expand Down Expand Up @@ -683,6 +686,110 @@ void BackupCoordinationNode(TDriver driver, const TString& dbPath, const TFsPath
BackupPermissions(driver, dbPath, fsBackupFolder);
}

namespace {

NReplication::TReplicationDescription DescribeReplication(TDriver driver, const TString& path) {
NReplication::TReplicationClient client(driver);
auto status = NConsoleClient::RetryFunction([&]() {
return client.DescribeReplication(path).ExtractValueSync();
});
VerifyStatus(status, "describe async replication");
return status.GetReplicationDescription();
}

TString BuildConnectionString(const NReplication::TConnectionParams& params) {
return TStringBuilder()
<< (params.GetEnableSsl() ? "grpcs://" : "grpc://")
<< params.GetDiscoveryEndpoint()
<< "/?database=" << params.GetDatabase();
}

inline TString BuildTarget(const char* src, const char* dst) {
return TStringBuilder() << " `" << src << "` AS `" << dst << "`";
}

inline TString Quote(const char* value) {
return TStringBuilder() << "'" << value << "'";
}

template <typename StringType>
inline TString Quote(const StringType& value) {
return Quote(value.c_str());
}

inline TString BuildOption(const char* key, const TString& value) {
return TStringBuilder() << " " << key << " = " << value << "";
}

inline TString Interval(const TDuration& value) {
return TStringBuilder() << "Interval('PT" << value.Seconds() << "S')";
}

TString BuildCreateReplicationQuery(
const TString& name,
const TString& dbPath,
const NReplication::TReplicationDescription& desc,
const TString& backupRoot,
NYql::TIssues& issues)
{
// TODO(ilnaz)
Y_UNUSED(dbPath);
Y_UNUSED(backupRoot);
Y_UNUSED(issues);

TVector<TString> targets(::Reserve(desc.GetItems().size()));
for (const auto& item : desc.GetItems()) {
if (!item.DstPath.ends_with("/indexImplTable")) { // TODO(ilnaz): get rid of this hack
targets.push_back(BuildTarget(item.SrcPath.c_str(), item.DstPath.c_str()));
}
}

const auto& params = desc.GetConnectionParams();

TVector<TString> opts(::Reserve(5 /* max options */));
opts.push_back(BuildOption("CONNECTION_STRING", Quote(BuildConnectionString(params))));
switch (params.GetCredentials()) {
case NReplication::TConnectionParams::ECredentials::Static:
opts.push_back(BuildOption("USER", Quote(params.GetStaticCredentials().User)));
opts.push_back(BuildOption("PASSWORD_SECRET_NAME", Quote(params.GetStaticCredentials().PasswordSecretName)));
break;
case NReplication::TConnectionParams::ECredentials::OAuth:
opts.push_back(BuildOption("TOKEN_SECRET_NAME", Quote(params.GetOAuthCredentials().TokenSecretName)));
break;
}

opts.push_back(BuildOption("CONSISTENCY_LEVEL", Quote(ToString(desc.GetConsistencyLevel()))));
if (desc.GetConsistencyLevel() == NReplication::TReplicationDescription::EConsistencyLevel::Global) {
opts.push_back(BuildOption("COMMIT_INTERVAL", Interval(desc.GetGlobalConsistency().GetCommitInterval())));
}

return std::format("CREATE ASYNC REPLICATION `{}`\nFOR\n{}\nWITH (\n{}\n);",
name.c_str(), JoinSeq(",\n", targets).c_str(), JoinSeq(",\n", opts).c_str());
}

}

void BackupReplication(
TDriver driver,
const TString& dbBackupRoot,
const TString& dbPathRelativeToBackupRoot,
const TFsPath& fsBackupFolder,
NYql::TIssues& issues)
{
Y_ENSURE(!dbPathRelativeToBackupRoot.empty());
const auto dbPath = JoinDatabasePath(dbBackupRoot, dbPathRelativeToBackupRoot);

LOG_I("Backup async replication " << dbPath.Quote() << " to " << fsBackupFolder.GetPath().Quote());

const auto name = TFsPath(dbPathRelativeToBackupRoot).GetName();
const auto desc = DescribeReplication(driver, dbPath);
const auto creationQuery = BuildCreateReplicationQuery(name, dbPath, desc, dbBackupRoot, issues);
Y_ENSURE(creationQuery, issues.ToString());

WriteCreationQueryToFile(creationQuery, fsBackupFolder, NDump::NFiles::CreateAsyncReplication());
BackupPermissions(driver, dbPath, fsBackupFolder);
}

void CreateClusterDirectory(const TDriver& driver, const TString& path, bool rootBackupDir = false) {
if (rootBackupDir) {
LOG_I("Create temporary directory " << path.Quote() << " in database");
Expand Down Expand Up @@ -776,6 +883,9 @@ void BackupFolderImpl(TDriver driver, const TString& dbPrefix, const TString& ba
if (dbIt.IsCoordinationNode()) {
BackupCoordinationNode(driver, dbIt.GetFullPath(), childFolderPath);
}
if (dbIt.IsReplication()) {
BackupReplication(driver, dbIt.GetTraverseRoot(), dbIt.GetRelPath(), childFolderPath, issues);
}
dbIt.Next();
}
}
Expand Down
18 changes: 17 additions & 1 deletion ydb/library/backup/db_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ class TDbIterator {
TString TraverseRoot;
TDeque<TSchemeEntryWithPath> NextNodes;

static const TVector<NScheme::ESchemeEntryType>& SupportedEntryTypes() {
static const TVector<NScheme::ESchemeEntryType> values = {
NScheme::ESchemeEntryType::Table,
NScheme::ESchemeEntryType::View,
NScheme::ESchemeEntryType::Topic,
NScheme::ESchemeEntryType::CoordinationNode,
NScheme::ESchemeEntryType::Replication,
};

return values;
}

public:
TDbIterator(TDriver driver, const TString& fullPath)
: Client(driver)
Expand All @@ -48,7 +60,7 @@ class TDbIterator {
Y_ENSURE(listResult.IsSuccess(), "Can't list directory, maybe it doesn't exist, dbPath# "
<< fullPath.Quote());

if (IsIn({NScheme::ESchemeEntryType::Table, NScheme::ESchemeEntryType::View}, listResult.GetEntry().Type)) {
if (IsIn(SupportedEntryTypes(), listResult.GetEntry().Type)) {
TPathSplitUnix parentPath(fullPath);
parentPath.pop_back();
TraverseRoot = parentPath.Reconstruct();
Expand Down Expand Up @@ -145,6 +157,10 @@ class TDbIterator {
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Directory;
}

bool IsReplication() const {
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Replication;
}

bool IsListed() const {
return NextNodes.front().IsListed;
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/public/lib/ydb_cli/dump/files/files.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ enum EFilesType {
CREATE_USER,
CREATE_GROUP,
ALTER_GROUP,
CREATE_ASYNC_REPLICATION,
};

static constexpr TFileInfo FILES_INFO[] = {
Expand All @@ -36,6 +37,7 @@ static constexpr TFileInfo FILES_INFO[] = {
{"create_user.sql", "users"},
{"create_group.sql", "groups"},
{"alter_group.sql", "group members"},
{"create_async_replication.sql", "async replication"},
};

const TFileInfo& TableScheme() {
Expand Down Expand Up @@ -98,4 +100,8 @@ const TFileInfo& AlterGroup() {
return FILES_INFO[ALTER_GROUP];
}

const TFileInfo& CreateAsyncReplication() {
return FILES_INFO[CREATE_ASYNC_REPLICATION];
}

} // NYdb::NDump::NFiles
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/dump/files/files.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ const TFileInfo& Database();
const TFileInfo& CreateUser();
const TFileInfo& CreateGroup();
const TFileInfo& AlterGroup();
const TFileInfo& CreateAsyncReplication();

} // NYdb::NDump:NFiles

0 comments on commit 5a8b1e9

Please sign in to comment.