Skip to content

Commit

Permalink
Add backup incremental backup collection (#11870)
Browse files Browse the repository at this point in the history
  • Loading branch information
Enjection authored Nov 25, 2024
1 parent 1dd2b99 commit 378c678
Show file tree
Hide file tree
Showing 29 changed files with 422 additions and 56 deletions.
6 changes: 6 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
break;
}

case NKqpProto::TKqpSchemeOperation::kBackupIncremental: {
const auto& modifyScheme = schemeOp.GetBackupIncremental();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,10 @@ class TKikimrIcGateway : public IKqpGateway {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> BackupIncremental(const TString&, const NYql::TBackupSettings&) override {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> CreateUser(const TString& cluster, const NYql::TCreateUserSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

Expand Down
43 changes: 43 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,49 @@ class TKqpGatewayProxy : public IKikimrGateway {
}
}

TFuture<TGenericResult> BackupIncremental(const TString& cluster, const NYql::TBackupSettings& settings) override {
CHECK_PREPARED_DDL(BackupIncremental);

try {
if (cluster != SessionCtx->GetCluster()) {
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
}

std::pair<TString, TString> pathPair;
if (settings.Name.StartsWith("/")) {
TString error;
if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, true)) {
return MakeFuture(ResultFromError<TGenericResult>(error));
}
} else {
pathPair.second = ".backups/collections/" + settings.Name;
}

NKikimrSchemeOp::TModifyScheme tx;
tx.SetWorkingDir(GetDatabase());
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpBackupIncrementalBackupCollection);

auto& op = *tx.MutableBackupIncrementalBackupCollection();
op.SetName(pathPair.second);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
phyTx.MutableSchemeOperation()->MutableBackupIncremental()->Swap(&tx);

TGenericResult result;
result.SetSuccess();
return MakeFuture(result);
} else {
return Gateway->ModifyScheme(std::move(tx));
}
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) override {
CHECK_PREPARED_DDL(CreateUser);

Expand Down
22 changes: 21 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
return TStatus::Ok;
}

TStatus HandleBackupIncremental(TKiBackupIncremental node, TExprContext& ctx) override {
Y_UNUSED(ctx);
Y_UNUSED(node);
return TStatus::Ok;
}

TStatus HandleCreateUser(TKiCreateUser node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "CreateUser is not yet implemented for intent determination transformer"));
Expand Down Expand Up @@ -605,7 +611,9 @@ class TKikimrDataSink : public TDataProviderBase
return true;
}

if (node.IsCallable(TKiBackup::CallableName())) {
if (node.IsCallable(TKiBackup::CallableName())
|| node.IsCallable(TKiBackupIncremental::CallableName())
) {
return true;
}

Expand Down Expand Up @@ -1515,6 +1523,14 @@ class TKikimrDataSink : public TDataProviderBase
.Prefix().Build(key.GetBackupCollectionPath().Prefix)
.Done()
.Ptr();
} else if (mode == "backupIncremental") {
return Build<TKiBackupIncremental>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.BackupCollection().Build(key.GetBackupCollectionPath().Name)
.Prefix().Build(key.GetBackupCollectionPath().Prefix)
.Done()
.Ptr();
} else {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown operation type for backup collection: " << TString(mode)));
return nullptr;
Expand Down Expand Up @@ -1790,6 +1806,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleBackup(node.Cast(), ctx);
}

if (auto node = TMaybeNode<TKiBackupIncremental>(input)) {
return HandleBackupIncremental(node.Cast(), ctx);
}

ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: "
<< callable.CallableName()));
return TStatus::Error;
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2616,6 +2616,28 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
}, "Executing BACKUP");
}

if (auto maybeBackupIncremental = TMaybeNode<TKiBackupIncremental>(input)) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
return SyncStatus(requireStatus);
}

auto backupIncremental = maybeBackupIncremental.Cast();

TBackupSettings settings;
settings.Name = TString(backupIncremental.BackupCollection());

auto cluster = TString(backupIncremental.DataSink().Cluster());
auto future = Gateway->BackupIncremental(cluster, settings);

return WrapFuture(future,
[](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
Y_UNUSED(res);
auto resultNode = ctx.NewWorld(input->Pos());
return resultNode;
}, "Executing BACKUP INCREMENTAL");
}

ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder()
<< "(Kikimr DataSink) Failed to execute node: " << input->Content()));
return SyncError();
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,17 @@
{"Index": 2, "Name": "BackupCollection", "Type": "TCoAtom"},
{"Index": 3, "Name": "Prefix", "Type": "TCoAtom"}
]
},
{
"Name": "TKiBackupIncremental",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KiBackupIncremental!"},
"Children": [
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
{"Index": 2, "Name": "BackupCollection", "Type": "TCoAtom"},
{"Index": 3, "Name": "Prefix", "Type": "TCoAtom"}
]
}
]
}
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,8 @@ class IKikimrGateway : public TThrRefBase {

virtual NThreading::TFuture<TGenericResult> Backup(const TString& cluster, const TBackupSettings& settings) = 0;

virtual NThreading::TFuture<TGenericResult> BackupIncremental(const TString& cluster, const TBackupSettings& settings) = 0;

virtual NThreading::TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) = 0;

virtual NThreading::TFuture<TGenericResult> AlterUser(const TString& cluster, const TAlterUserSettings& settings) = 0;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct TKikimrData {
DataSinkNames.insert(TKiAlterBackupCollection::CallableName());
DataSinkNames.insert(TKiDropBackupCollection::CallableName());
DataSinkNames.insert(TKiBackup::CallableName());
DataSinkNames.insert(TKiBackupIncremental::CallableName());

CommitModes.insert(CommitModeFlush);
CommitModes.insert(CommitModeRollback);
Expand Down Expand Up @@ -128,7 +129,8 @@ struct TKikimrData {
TYdbOperation::CreateBackupCollection |
TYdbOperation::AlterBackupCollection |
TYdbOperation::DropBackupCollection |
TYdbOperation::Backup;
TYdbOperation::Backup |
TYdbOperation::BackupIncremental;

SystemColumns = {
{"_yql_partition_id", NKikimr::NUdf::EDataSlot::Uint64}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class TKikimrTablesData : public TThrRefBase {
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
};

enum class TYdbOperation : ui32 {
enum class TYdbOperation : ui64 {
CreateTable = 1ull << 0,
DropTable = 1ull << 1,
AlterTable = 1ull << 2,
Expand Down Expand Up @@ -249,6 +249,7 @@ enum class TYdbOperation : ui32 {
AlterBackupCollection = 1ull << 29,
DropBackupCollection = 1ull << 30,
Backup = 1ull << 31,
BackupIncremental = 1ull << 32,
};

Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class TKiSinkVisitorTransformer : public TSyncTransformerBase {
virtual TStatus HandleAlterBackupCollection(NNodes::TKiAlterBackupCollection node, TExprContext& ctx) = 0;
virtual TStatus HandleDropBackupCollection(NNodes::TKiDropBackupCollection node, TExprContext& ctx) = 0;
virtual TStatus HandleBackup(NNodes::TKiBackup node, TExprContext& ctx) = 0;
virtual TStatus HandleBackupIncremental(NNodes::TKiBackupIncremental node, TExprContext& ctx) = 0;
};

class TKikimrKey {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2185,6 +2185,11 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
return TStatus::Ok;
}

TStatus HandleBackupIncremental(TKiBackupIncremental node, TExprContext&) override {
node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
return TStatus::Ok;
}

private:
TIntrusivePtr<IKikimrGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1655,8 +1655,11 @@ message TModifyScheme {
optional TBackupCollectionDescription AlterBackupCollection = 75;
optional TBackupCollectionDescription DropBackupCollection = 76;
optional TBackupBackupCollection BackupBackupCollection = 78;
optional TBackupBackupCollection BackupIncrementalBackupCollection = 79;

optional TMove MoveSequence = 77;

// Some entries are grouped by semantics, so are out of order
}

message TCopySequence {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp_physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ message TKqpSchemeOperation {
NKikimrSchemeOp.TModifyScheme AlterBackupCollection = 46;
NKikimrSchemeOp.TModifyScheme DropBackupCollection = 47;
NKikimrSchemeOp.TModifyScheme Backup = 48;
NKikimrSchemeOp.TModifyScheme BackupIncremental = 49;
}
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/protos/schemeshard/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ enum EOperationType {
ESchemeOpAlterBackupCollection = 106;
ESchemeOpDropBackupCollection = 107;
ESchemeOpBackupBackupCollection = 109;
ESchemeOpBackupIncrementalBackupCollection = 110;

// Move sequence
ESchemeOpMoveSequence = 108;

// Some entries are grouped by semantics, so are out of order
}
110 changes: 67 additions & 43 deletions ydb/core/tx/schemeshard/schemeshard__op_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,53 +299,64 @@ struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCol
constexpr inline static bool CreateDirsFromName = true;
};

template <>
struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection> : public TSchemeTxTraitsFallback {
static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) {
THashMap<TString, THashSet<TString>> paths;

const auto& backupOp = tx.GetBackupBackupCollection();

const auto& targetDir = backupOp.GetTargetDir();
const TString& targetPath = JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()});

const TPath& bcPath = TPath::Resolve(targetPath, context.SS);
{
auto checks = bcPath.Check();
checks
.NotEmpty()
.NotUnderDomainUpgrade()
.IsAtLocalSchemeShard()
.IsResolved()
.NotUnderDeleting()
.NotUnderOperation()
.IsBackupCollection();

if (!checks) {
return {};
}
inline TString ToX509String(const TInstant& datetime) {
return datetime.FormatLocalTime("%Y%m%d%H%M%SZ");
}

inline std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(
const TTxTransaction& tx,
const TString& targetDir,
const TString& targetName,
const TOperationContext& context)
{
THashMap<TString, THashSet<TString>> paths;
const TString& targetPath = JoinPath({tx.GetWorkingDir(), targetName});

const TPath& bcPath = TPath::Resolve(targetPath, context.SS);
{
auto checks = bcPath.Check();
checks
.NotEmpty()
.NotUnderDomainUpgrade()
.IsAtLocalSchemeShard()
.IsResolved()
.NotUnderDeleting()
.NotUnderOperation()
.IsBackupCollection();

if (!checks) {
return {};
}
}

Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId));
const auto& bc = context.SS->BackupCollections[bcPath->PathId];
Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId));
const auto& bc = context.SS->BackupCollections[bcPath->PathId];

auto& collectionPaths = paths[targetPath];
auto& collectionPaths = paths[targetPath];
collectionPaths.emplace(targetDir);

for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) {
std::pair<TString, TString> paths;
TString err;
if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) {
return {};
}
for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) {
std::pair<TString, TString> paths;
TString err;
if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) {
return {};
}

auto pathPieces = SplitPath(paths.second);
if (pathPieces.size() > 1) {
auto parent = ExtractParent(paths.second);
collectionPaths.emplace(JoinPath({targetDir, TString(parent)}));
}
auto pathPieces = SplitPath(paths.second);
if (pathPieces.size() > 1) {
auto parent = ExtractParent(paths.second);
collectionPaths.emplace(JoinPath({targetDir, TString(parent)}));
}
}

return paths;
}

return paths;
template <>
struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection> : public TSchemeTxTraitsFallback {
static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) {
const auto& backupOp = tx.GetBackupBackupCollection();
return ::NKikimr::NSchemeShard::GetRequiredPaths(tx, backupOp.GetTargetDir(), backupOp.GetName(), context);
}

static bool Rewrite(TTxTransaction& tx) {
Expand All @@ -356,10 +367,23 @@ struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCol

constexpr inline static bool CreateAdditionalDirs = true;
constexpr inline static bool NeedRewrite = true;
private:
static inline TString ToX509String(const TInstant& datetime) {
return datetime.FormatLocalTime("%Y%m%d%H%M%SZ");
};

template <>
struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupIncrementalBackupCollection> : public TSchemeTxTraitsFallback {
static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) {
const auto& backupOp = tx.GetBackupIncrementalBackupCollection();
return ::NKikimr::NSchemeShard::GetRequiredPaths(tx, backupOp.GetTargetDir(), backupOp.GetName(), context);
}

static bool Rewrite(TTxTransaction& tx) {
auto now = ToX509String(TlsActivationContext->AsActorContext().Now());
tx.MutableBackupIncrementalBackupCollection()->SetTargetDir(now + "_incremental");
return true;
}

constexpr inline static bool CreateAdditionalDirs = true;
constexpr inline static bool NeedRewrite = true;
};

} // namespace NKikimr::NSchemeShard
Loading

0 comments on commit 378c678

Please sign in to comment.