Skip to content

Commit

Permalink
Fix incremental backup cdc creation (#12267)
Browse files Browse the repository at this point in the history
  • Loading branch information
Enjection authored Dec 4, 2024
1 parent d215bee commit 42c2025
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 9 deletions.
73 changes: 72 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,77 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
)"));
}

} // Cdc
Y_UNIT_TEST_TWIN(SimpleBackupBackupCollection, WithIncremental) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
.SetEnableBackupService(true)
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10)
, (2, 20)
, (3, 30)
;
)");

ExecSQL(server, edgeActor, R"(
CREATE BACKUP COLLECTION `MyCollection`
( TABLE `/Root/Table`
)
WITH
( STORAGE = 'cluster'
, INCREMENTAL_BACKUP_ENABLED = ')" + TString(WithIncremental ? "true" : "false") + R"('
);
)", false);

ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false);

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
-- TODO: fix with navigate after proper scheme cache handling
SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000001Z_full/Table`
ORDER BY key
)"),
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/Table`
ORDER BY key
)"));


if (WithIncremental) {
ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(2, 200);
)");

ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=1;)");

ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false);

SimulateSleep(server, TDuration::Seconds(1));

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
-- TODO: fix with navigate after proper scheme cache handling
SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table`
ORDER BY key
)"),
"{ items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, "
"{ items { uint32_value: 2 } items { uint32_value: 200 } }");
}
}

} // Y_UNIT_TEST_SUITE(IncrementalBackup)

} // NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

#include <util/generic/algorithm.h>

NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, bool omitFollowers, bool isBackup, bool allowUnderSameOp) {
NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, const NKikimrSchemeOp::TCopyTableConfig& descr) {
using namespace NKikimr::NSchemeShard;

auto scheme = TransactionTemplate(dst.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable);
Expand All @@ -18,9 +18,13 @@ NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src,
auto operation = scheme.MutableCreateTable();
operation->SetName(dst.LeafName());
operation->SetCopyFromTable(src.PathString());
operation->SetOmitFollowers(omitFollowers);
operation->SetIsBackup(isBackup);
operation->SetAllowUnderSameOperation(allowUnderSameOp);
operation->SetOmitFollowers(descr.GetOmitFollowers());
operation->SetIsBackup(descr.GetIsBackup());
operation->SetAllowUnderSameOperation(descr.GetAllowUnderSameOperation());
if (descr.HasCreateSrcCdcStream()) {
auto* coOp = scheme.MutableCreateCdcStream();
coOp->CopyFrom(descr.GetCreateSrcCdcStream());
}

return scheme;
}
Expand Down Expand Up @@ -144,8 +148,10 @@ bool CreateConsistentCopyTables(
sequences.emplace(sequenceName);
}

result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()), sequences));
result.push_back(CreateCopyTable(
NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr),
sequences));

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
Expand Down Expand Up @@ -190,8 +196,9 @@ bool CreateConsistentCopyTables(
Y_ABORT_UNLESS(srcImplTable.Base()->PathId == srcIndexPath.Base()->GetChildren().begin()->second);
TPath dstImplTable = dstIndexPath.Child(srcImplTableName);

result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation())));
result.push_back(CreateCopyTable(
NextPartId(nextId, result),
CopyTableTask(srcImplTable, dstImplTable, descr)));
}

for (auto&& sequenceDescription : sequenceDescriptions) {
Expand Down

0 comments on commit 42c2025

Please sign in to comment.