Skip to content

Commit

Permalink
KESUS: local backups (ydb tools dump / restore) (ydb-platform#14199)
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 authored Feb 5, 2025
1 parent 5d66850 commit 7c9f2b3
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 40 deletions.
29 changes: 29 additions & 0 deletions ydb/library/backup/backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,32 @@ void BackupTopic(TDriver driver, const TString& dbPath, const TFsPath& fsBackupF
BackupPermissions(driver, dbPath, fsBackupFolder);
}

namespace {

NCoordination::TNodeDescription DescribeCoordinationNode(TDriver driver, const TString& path) {
NCoordination::TClient client(driver);
auto status = NConsoleClient::RetryFunction([&]() {
return client.DescribeNode(path).ExtractValueSync();
});
VerifyStatus(status, "describe coordination node to build a backup");
return status.ExtractResult();
}

}

void BackupCoordinationNode(TDriver driver, const TString& dbPath, const TFsPath& fsBackupFolder) {
Y_ENSURE(!dbPath.empty());
LOG_I("Backup coordination node " << dbPath.Quote() << " to " << fsBackupFolder.GetPath().Quote());

const auto nodeDescription = DescribeCoordinationNode(driver, dbPath);

Ydb::Coordination::CreateNodeRequest creationRequest;
nodeDescription.SerializeTo(creationRequest);

WriteProtoToFile(creationRequest, fsBackupFolder, NDump::NFiles::CreateCoordinationNode());
BackupPermissions(driver, dbPath, fsBackupFolder);
}

void CreateClusterDirectory(const TDriver& driver, const TString& path, bool rootBackupDir = false) {
if (rootBackupDir) {
LOG_I("Create temporary directory " << path.Quote());
Expand Down Expand Up @@ -700,6 +726,9 @@ void BackupFolderImpl(TDriver driver, const TString& dbPrefix, const TString& ba
if (dbIt.IsTopic()) {
BackupTopic(driver, dbIt.GetFullPath(), childFolderPath);
}
if (dbIt.IsCoordinationNode()) {
BackupCoordinationNode(driver, dbIt.GetFullPath(), childFolderPath);
}
dbIt.Next();
}
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/backup/db_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class TDbIterator {
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Topic;
}

bool IsCoordinationNode() const {
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::CoordinationNode;
}

bool IsDir() const {
return GetCurrentNode()->Type == NScheme::ESchemeEntryType::Directory;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/backup/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PEERDIR(
ydb/public/lib/ydb_cli/dump/util
ydb/public/lib/yson_value
ydb/public/lib/ydb_cli/dump/files
ydb/public/sdk/cpp/src/client/coordination
ydb/public/sdk/cpp/src/client/draft
ydb/public/sdk/cpp/src/client/driver
ydb/public/sdk/cpp/src/client/proto
Expand Down
6 changes: 6 additions & 0 deletions ydb/public/lib/ydb_cli/dump/files/files.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ enum EFilesType {
CHANGEFEED_DESCRIPTION,
TOPIC_DESCRIPTION,
CREATE_TOPIC,
CREATE_COORDINATION_NODE,
INCOMPLETE_DATA,
INCOMPLETE,
EMPTY,
Expand All @@ -20,6 +21,7 @@ static constexpr TFileInfo FILES_INFO[] = {
{"changefeed_description.pb", "changefeed"},
{"topic_description.pb", "topic"},
{"create_topic.pb", "topic"},
{"create_coordination_node.pb", "coordination node"},
{"incomplete.csv", "incomplete"},
{"incomplete", "incomplete"},
{"empty_dir", "empty_dir"},
Expand All @@ -46,6 +48,10 @@ const TFileInfo& CreateTopic() {
return FILES_INFO[CREATE_TOPIC];
}

const TFileInfo& CreateCoordinationNode() {
return FILES_INFO[CREATE_COORDINATION_NODE];
}

const TFileInfo& IncompleteData() {
return FILES_INFO[INCOMPLETE_DATA];
}
Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/dump/files/files.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const TFileInfo& Permissions();
const TFileInfo& Changefeed();
const TFileInfo& TopicDescription();
const TFileInfo& CreateTopic();
const TFileInfo& CreateCoordinationNode();
const TFileInfo& IncompleteData();
const TFileInfo& Incomplete();
const TFileInfo& Empty();
Expand Down
75 changes: 64 additions & 11 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ Ydb::Topic::CreateTopicRequest ReadTopicCreationRequest(const TFsPath& fsDirPath
return ReadProtoFromFile<Ydb::Topic::CreateTopicRequest>(fsDirPath, log, NFiles::CreateTopic());
}

Ydb::Coordination::CreateNodeRequest ReadCoordinationNodeCreationRequest(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::Coordination::CreateNodeRequest>(fsDirPath, log, NDump::NFiles::CreateCoordinationNode());
}

Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::Scheme::ModifyPermissionsRequest>(fsDirPath, log, NFiles::Permissions());
}
Expand Down Expand Up @@ -101,7 +105,7 @@ TStatus WaitForIndexBuild(TOperationClient& client, const TOperation::TOperation
return operation.Status();
}
}
NConsoleClient::ExponentialBackoff(retrySleep, TDuration::Minutes(1));
ExponentialBackoff(retrySleep, TDuration::Minutes(1));
}
}

Expand Down Expand Up @@ -159,12 +163,24 @@ TRestoreResult CheckExistenceAndType(TSchemeClient& client, const TString& dbPat

TStatus CreateTopic(TTopicClient& client, const TString& dbPath, const Ydb::Topic::CreateTopicRequest& request) {
const auto settings = TCreateTopicSettings(request);
auto result = NConsoleClient::RetryFunction([&]() {
auto result = RetryFunction([&]() {
return client.CreateTopic(dbPath, settings).ExtractValueSync();
});
return result;
}

TStatus CreateCoordinationNode(
NCoordination::TClient& client,
const TString& dbPath,
const Ydb::Coordination::CreateNodeRequest& request)
{
const auto settings = NCoordination::TCreateNodeSettings(request.config());
auto result = RetryFunction([&]() {
return client.CreateNode(dbPath, settings).ExtractValueSync();
});
return result;
}

} // anonymous

namespace NPrivate {
Expand Down Expand Up @@ -228,6 +244,7 @@ TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog
, SchemeClient(driver)
, TableClient(driver)
, TopicClient(driver)
, CoordinationNodeClient(driver)
, QueryClient(driver)
, Log(log)
{
Expand Down Expand Up @@ -322,29 +339,31 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP
TMaybe<TStatus> result;

switch (entry.Type) {
case ESchemeEntryType::Directory: {
result = NConsoleClient::RemoveDirectoryRecursive(SchemeClient, TableClient, nullptr, &QueryClient,
case ESchemeEntryType::Directory:
result = RemoveDirectoryRecursive(SchemeClient, TableClient, nullptr, &QueryClient,
TString{fullPath}, ERecursiveRemovePrompt::Never, {}, true, false);
break;
}
case ESchemeEntryType::Table: {
case ESchemeEntryType::Table:
result = TableClient.RetryOperationSync([&path = fullPath](TSession session) {
return session.DropTable(path).GetValueSync();
});
break;
}
case ESchemeEntryType::View: {
case ESchemeEntryType::View:
result = QueryClient.RetryQuerySync([&path = fullPath](NQuery::TSession session) {
return session.ExecuteQuery(std::format("DROP VIEW IF EXISTS `{}`;", path),
NQuery::TTxControl::NoTx()).ExtractValueSync();
});
break;
}
case ESchemeEntryType::Topic:
result = NConsoleClient::RetryFunction([&client = TopicClient, &path = fullPath]() {
result = RetryFunction([&client = TopicClient, &path = fullPath]() {
return client.DropTopic(path).ExtractValueSync();
});
break;
case ESchemeEntryType::CoordinationNode:
result = RetryFunction([&client = CoordinationNodeClient, &path = fullPath]() {
return client.DropNode(path).ExtractValueSync();
});
break;
default:
break;
}
Expand Down Expand Up @@ -407,6 +426,10 @@ TRestoreResult TRestoreClient::RestoreFolder(
return RestoreTopic(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath));
}

if (IsFileExists(fsPath.Child(NFiles::CreateCoordinationNode().FileName))) {
return RestoreCoordinationNode(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath));
}

if (IsFileExists(fsPath.Child(NFiles::Empty().FileName))) {
return RestoreEmptyDir(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath));
}
Expand All @@ -426,6 +449,8 @@ TRestoreResult TRestoreClient::RestoreFolder(
ViewRestorationCalls.emplace_back(child, dbRestoreRoot, Join('/', dbPathRelativeToRestoreRoot, child.GetName()), settings, oldEntries.contains(childDbPath));
} else if (IsFileExists(child.Child(NFiles::CreateTopic().FileName))) {
result = RestoreTopic(child, childDbPath, settings, oldEntries.contains(childDbPath));
} else if (IsFileExists(child.Child(NFiles::CreateCoordinationNode().FileName))) {
result = RestoreCoordinationNode(child, childDbPath, settings, oldEntries.contains(childDbPath));
} else if (child.IsDirectory()) {
result = RestoreFolder(child, dbRestoreRoot, Join('/', dbPathRelativeToRestoreRoot, child.GetName()), settings, oldEntries);
}
Expand Down Expand Up @@ -525,6 +550,34 @@ TRestoreResult TRestoreClient::RestoreTopic(
return Result<TRestoreResult>(dbPath, std::move(result));
}

TRestoreResult TRestoreClient::RestoreCoordinationNode(
const TFsPath& fsPath,
const TString& dbPath,
const TRestoreSettings& settings,
bool isAlreadyExisting)
{
LOG_D("Process " << fsPath.GetPath().Quote());

if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

LOG_I("Restore coordination node " << fsPath.GetPath().Quote() << " to " << dbPath.Quote());

if (settings.DryRun_) {
return CheckExistenceAndType(SchemeClient, dbPath, NScheme::ESchemeEntryType::CoordinationNode);
}

const auto creationRequest = ReadCoordinationNodeCreationRequest(fsPath, Log.get());
auto result = CreateCoordinationNode(CoordinationNodeClient, dbPath, creationRequest);
if (result.IsSuccess()) {
LOG_D("Created " << dbPath.Quote());
return RestorePermissions(fsPath, dbPath, settings, isAlreadyExisting);
}
LOG_E("Failed to create " << dbPath.Quote());
return Result<TRestoreResult>(dbPath, std::move(result));
}

TRestoreResult TRestoreClient::RestoreTable(
const TFsPath& fsPath,
const TString& dbPath,
Expand Down Expand Up @@ -862,7 +915,7 @@ TRestoreResult TRestoreClient::RestoreIndexes(const TString& dbPath, const TTabl
return Result<TRestoreResult>(dbPath, std::move(waitForIndexBuildStatus));
}

auto forgetStatus = NConsoleClient::RetryFunction([&]() {
auto forgetStatus = RetryFunction([&]() {
return OperationClient.Forget(buildIndexId).GetValueSync();
});
if (!forgetStatus.IsSuccess()) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/public/lib/ydb_cli/dump/restore_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "dump.h"

#include <ydb-cpp-sdk/client/coordination/coordination.h>
#include <ydb-cpp-sdk/client/import/import.h>
#include <ydb-cpp-sdk/client/operation/operation.h>
#include <ydb-cpp-sdk/client/query/client.h>
Expand Down Expand Up @@ -129,6 +130,7 @@ class TRestoreClient {
TRestoreResult RestoreTable(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreView(const TFsPath& fsPath, const TString& dbRestoreRoot, const TString& dbPathRelativeToRestoreRoot, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreTopic(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreCoordinationNode(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);

TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc);
Expand All @@ -154,6 +156,7 @@ class TRestoreClient {
NScheme::TSchemeClient SchemeClient;
NTable::TTableClient TableClient;
NTopic::TTopicClient TopicClient;
NCoordination::TClient CoordinationNodeClient;
NQuery::TQueryClient QueryClient;
std::shared_ptr<TLog> Log;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

namespace Ydb {
namespace Coordination {
class Config;
class CreateNodeRequest;
class DescribeNodeResult;
class SemaphoreSession;
class SemaphoreDescription;
class SemaphoreSession;
}
}

Expand Down Expand Up @@ -107,6 +109,8 @@ class TNodeDescription {
const std::vector<NScheme::TPermissions>& GetEffectivePermissions() const;
const Ydb::Coordination::DescribeNodeResult& GetProto() const;

void SerializeTo(Ydb::Coordination::CreateNodeRequest& creationRequest) const;

private:
struct TImpl;
std::shared_ptr<TImpl> Impl_;
Expand Down Expand Up @@ -189,7 +193,10 @@ struct TNodeSettings : public TOperationRequestSettings<TDerived> {
FLUENT_SETTING_DEFAULT(ERateLimiterCountersMode, RateLimiterCountersMode, ERateLimiterCountersMode::UNSET);
};

struct TCreateNodeSettings : public TNodeSettings<TCreateNodeSettings> { };
struct TCreateNodeSettings : public TNodeSettings<TCreateNodeSettings> {
TCreateNodeSettings() = default;
TCreateNodeSettings(const Ydb::Coordination::Config& config);
};
struct TAlterNodeSettings : public TNodeSettings<TAlterNodeSettings> { };
struct TDropNodeSettings : public TOperationRequestSettings<TDropNodeSettings> { };
struct TDescribeNodeSettings : public TOperationRequestSettings<TDescribeNodeSettings> { };
Expand Down
Loading

0 comments on commit 7c9f2b3

Please sign in to comment.