From ee426f6eeed54c935c7381ca5a1527d87ca3d230 Mon Sep 17 00:00:00 2001 From: Daniil Demin Date: Wed, 29 Jan 2025 11:29:03 +0300 Subject: [PATCH] VIEW: import (#13361) --- ydb/core/tx/schemeshard/schemeshard__init.cpp | 13 + .../schemeshard_export__create.cpp | 3 +- ydb/core/tx/schemeshard/schemeshard_impl.cpp | 7 + ydb/core/tx/schemeshard/schemeshard_impl.h | 5 + .../tx/schemeshard/schemeshard_import.cpp | 28 +- .../schemeshard_import__create.cpp | 304 ++++++++++++-- .../schemeshard_import_scheme_getter.cpp | 35 +- ...hemeshard_import_scheme_query_executor.cpp | 178 ++++++++ ...schemeshard_import_scheme_query_executor.h | 9 + .../tx/schemeshard/schemeshard_info_types.h | 7 +- ydb/core/tx/schemeshard/schemeshard_private.h | 33 +- ydb/core/tx/schemeshard/schemeshard_schema.h | 5 + .../ut_export_reboots_s3.cpp | 36 +- .../ut_helpers/export_reboots_common.cpp | 2 +- .../ut_helpers/export_reboots_common.h | 17 +- .../tx/schemeshard/ut_helpers/test_env.cpp | 66 ++- ydb/core/tx/schemeshard/ut_helpers/test_env.h | 1 + .../ut_helpers/ut_backup_restore_common.h | 21 + .../tx/schemeshard/ut_restore/ut_restore.cpp | 385 +++++++++++++++--- ydb/core/tx/schemeshard/ya.make | 2 + ydb/library/backup/backup.cpp | 2 +- ydb/library/services/services.proto | 1 + ydb/public/lib/ydb_cli/dump/restore_impl.cpp | 92 +---- .../lib/ydb_cli/dump/util/view_utils.cpp | 194 ++++++--- ydb/public/lib/ydb_cli/dump/util/view_utils.h | 20 +- .../flat_schemeshard.schema | 14 +- 26 files changed, 1171 insertions(+), 309 deletions(-) create mode 100644 ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp create mode 100644 ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index f17e8dd97066..a6f26ad50798 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4441,6 +4441,19 @@ struct TSchemeShard::TTxInit : public TTransactionBase { item.Scheme = scheme; } + if (rowset.HaveValue()) { + item.CreationQuery = rowset.GetValue(); + } + + if (rowset.HaveValue()) { + NKikimrSchemeOp::TModifyScheme preparedQuery; + Y_ABORT_UNLESS(ParseFromStringNoSizeLimit( + preparedQuery, + rowset.GetValue() + )); + item.PreparedCreationQuery = std::move(preparedQuery); + } + if (rowset.HaveValue()) { Ydb::Scheme::ModifyPermissionsRequest permissions; Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(permissions, rowset.GetValue())); diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 41e2f6698a57..0b456e3f1f87 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -988,8 +988,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase NIceDb::TNiceDb db(txc.DB); auto& item = exportInfo->Items[itemIdx]; - Self->RunningExportSchemeUploaders.erase(item.SchemeUploader); - item.SchemeUploader = TActorId(); + Self->RunningExportSchemeUploaders.erase(std::exchange(item.SchemeUploader, {})); if (!result.Success) { item.State = EState::Cancelled; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 0640f66b44f4..96e4f30f9eb6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4558,6 +4558,12 @@ void TSchemeShard::Die(const TActorContext &ctx) { for (TActorId schemeUploader : RunningExportSchemeUploaders) { ctx.Send(schemeUploader, new TEvents::TEvPoisonPill()); } + for (TActorId schemeGetter : RunningImportSchemeGetters) { + ctx.Send(schemeGetter, new TEvents::TEvPoisonPill()); + } + for (TActorId schemeQueryExecutor : RunningImportSchemeQueryExecutors) { + ctx.Send(schemeQueryExecutor, new TEvents::TEvPoisonPill()); + } IndexBuildPipes.Shutdown(ctx); CdcStreamScanPipes.Shutdown(ctx); @@ -4855,6 +4861,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvImport::TEvForgetImportRequest, Handle); HFuncTraced(TEvImport::TEvListImportsRequest, Handle); HFuncTraced(TEvPrivate::TEvImportSchemeReady, Handle); + HFuncTraced(TEvPrivate::TEvImportSchemeQueryResult, Handle); // } // NImport // namespace NBackup { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 333c16b3f320..9c545a3f99d8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1260,6 +1260,8 @@ class TSchemeShard THashMap Imports; THashMap ImportsByUid; THashMap> TxIdToImport; + THashSet RunningImportSchemeGetters; + THashSet RunningImportSchemeQueryExecutors; void FromXxportInfo(NKikimrImport::TImport& exprt, const TImportInfo::TPtr importInfo); @@ -1268,6 +1270,7 @@ class TSchemeShard static void PersistImportState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo); static void PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); static void PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); + static void PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); static void PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); struct TImport { @@ -1291,6 +1294,7 @@ class TSchemeShard NTabletFlatExecutor::ITransaction* CreateTxProgressImport(ui64 id, const TMaybe& itemIdx = Nothing()); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeReady::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvIndexBuilder::TEvCreateResponse::TPtr& ev); @@ -1302,6 +1306,7 @@ class TSchemeShard void Handle(TEvImport::TEvForgetImportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvImport::TEvListImportsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx); void ResumeImports(const TVector& ids, const TActorContext& ctx); // } // NImport diff --git a/ydb/core/tx/schemeshard/schemeshard_import.cpp b/ydb/core/tx/schemeshard/schemeshard_import.cpp index 0ced40d7370c..04f51430691a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import.cpp @@ -63,7 +63,7 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI case TImportInfo::EState::Waiting: switch (GetMinState(importInfo)) { case TImportInfo::EState::GetScheme: - case TImportInfo::EState::CreateTable: + case TImportInfo::EState::CreateSchemeObject: import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_PREPARING); break; case TImportInfo::EState::Transferring: @@ -171,11 +171,18 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); const auto& item = importInfo->Items.at(itemIdx); - db.Table().Key(importInfo->Id, itemIdx).Update( + auto record = db.Table().Key(importInfo->Id, itemIdx); + record.Update( NIceDb::TUpdate(item.Scheme.SerializeAsString()) ); + + if (!item.CreationQuery.empty()) { + record.Update( + NIceDb::TUpdate(item.CreationQuery) + ); + } if (item.Permissions.Defined()) { - db.Table().Key(importInfo->Id, itemIdx).Update( + record.Update( NIceDb::TUpdate(item.Permissions->SerializeAsString()) ); } @@ -184,6 +191,17 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf ); } +void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + const auto& item = importInfo->Items[itemIdx]; + + if (item.PreparedCreationQuery) { + db.Table().Key(importInfo->Id, itemIdx).Update( + NIceDb::TUpdate(item.PreparedCreationQuery->SerializeAsString()) + ); + } +} + void TSchemeShard::PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); const auto& item = importInfo->Items.at(itemIdx); @@ -218,6 +236,10 @@ void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TAct Execute(CreateTxProgressImport(ev), ctx); } +void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressImport(ev), ctx); +} + void TSchemeShard::ResumeImports(const TVector& ids, const TActorContext& ctx) { for (const ui64 id : ids) { Execute(CreateTxProgressImport(id), ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index e3845ac57158..c3d9a639966f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -1,15 +1,19 @@ -#include "schemeshard_xxport__tx_base.h" -#include "schemeshard_xxport__helpers.h" -#include "schemeshard_import_flow_proposals.h" -#include "schemeshard_import_scheme_getter.h" -#include "schemeshard_import_helpers.h" -#include "schemeshard_import.h" #include "schemeshard_audit_log.h" #include "schemeshard_impl.h" +#include "schemeshard_import.h" +#include "schemeshard_import_flow_proposals.h" +#include "schemeshard_import_helpers.h" +#include "schemeshard_import_scheme_getter.h" +#include "schemeshard_import_scheme_query_executor.h" +#include "schemeshard_xxport__helpers.h" +#include "schemeshard_xxport__tx_base.h" +#include #include #include #include +#include +#include #include #include @@ -22,6 +26,27 @@ namespace NSchemeShard { using namespace NTabletFlatExecutor; +namespace { + +bool IsWaiting(const TImportInfo::TItem& item) { + return item.State == TImportInfo::EState::Waiting; +} + +bool IsDoneOrWaiting(const TImportInfo::TItem& item) { + return TImportInfo::TItem::IsDone(item) || IsWaiting(item); +} + +// the item is to be created by query, i.e. it is not a table +bool IsCreatedByQuery(const TImportInfo::TItem& item) { + return !item.CreationQuery.empty(); +} + +TString GetDatabase(TSchemeShard& ss) { + return CanonizePath(ss.RootPathElements); +} + +} + struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { TEvImport::TEvCreateImportRequest::TPtr Request; bool Progress; @@ -237,6 +262,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui64 Id; TMaybe ItemIdx; TEvPrivate::TEvImportSchemeReady::TPtr SchemeResult = nullptr; + TEvPrivate::TEvImportSchemeQueryResult::TPtr SchemeQueryResult = nullptr; TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; TEvIndexBuilder::TEvCreateResponse::TPtr CreateIndexResult = nullptr; @@ -255,6 +281,12 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } + explicit TTxProgress(TSelf* self, TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) + : TXxport::TTxBase(self) + , SchemeQueryResult(ev) + { + } + explicit TTxProgress(TSelf* self, TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) : TXxport::TTxBase(self) , AllocateResult(ev) @@ -288,6 +320,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (SchemeResult) { OnSchemeResult(txc, ctx); + } else if (SchemeQueryResult) { + OnSchemeQueryPreparation(txc); } else if (AllocateResult) { OnAllocateResult(txc, ctx); } else if (ModifyResult) { @@ -310,13 +344,14 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase private: void GetScheme(TImportInfo::TPtr importInfo, ui32 itemIdx, const TActorContext& ctx) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); - const auto& item = importInfo->Items.at(itemIdx); + auto& item = importInfo->Items.at(itemIdx); LOG_I("TImport::TTxProgress: Get scheme" << ": info# " << importInfo->ToString() << ", item# " << item.ToString(itemIdx)); - ctx.RegisterWithSameMailbox(CreateSchemeGetter(Self->SelfId(), importInfo, itemIdx)); + item.SchemeGetter = ctx.RegisterWithSameMailbox(CreateSchemeGetter(Self->SelfId(), importInfo, itemIdx)); + Self->RunningImportSchemeGetters.emplace(item.SchemeGetter); } void CreateTable(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { @@ -338,6 +373,66 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->SelfId(), std::move(propose)); } + void ExecutePreparedQuery(TTransactionContext& txc, TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + auto& item = importInfo->Items[itemIdx]; + + item.SubState = ESubState::Proposed; + + LOG_I("TImport::TTxProgress: ExecutePreparedQuery" + << ": info# " << importInfo->ToString() + << ", item# " << item.ToString(itemIdx) + << ", txId# " << txId + ); + + Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); + + auto propose = MakeHolder(ui64(txId), Self->TabletID()); + auto& record = propose->Record; + + auto& modifyScheme = *record.AddTransaction(); + modifyScheme = *item.PreparedCreationQuery; + modifyScheme.SetInternal(true); + + if (importInfo->UserSID) { + record.SetOwner(*importInfo->UserSID); + } + FillOwner(record, item.Permissions); + + if (TString error; !FillACL(modifyScheme, item.Permissions, error)) { + NIceDb::TNiceDb db(txc.DB); + return CancelAndPersist(db, importInfo, itemIdx, error, "cannot parse permissions"); + } + + Send(Self->SelfId(), std::move(propose)); + } + + void RetryViewsCreation(TImportInfo::TPtr importInfo, NIceDb::TNiceDb& db, const TActorContext& ctx) { + const auto database = GetDatabase(*Self); + TVector retriedItems; + for (ui32 itemIdx : xrange(importInfo->Items.size())) { + auto& item = importInfo->Items[itemIdx]; + if (IsWaiting(item) && IsCreatedByQuery(item) && item.ViewCreationRetries == 0) { + item.SchemeQueryExecutor = ctx.Register(CreateSchemeQueryExecutor( + Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database + )); + Self->RunningImportSchemeQueryExecutors.emplace(item.SchemeQueryExecutor); + + item.State = EState::CreateSchemeObject; + item.ViewCreationRetries++; + Self->PersistImportItemState(db, importInfo, itemIdx); + + retriedItems.emplace_back(itemIdx); + } + } + if (!retriedItems.empty()) { + LOG_D("TImport::TTxProgress: retry view creation" + << ": id# " << importInfo->Id + << ", retried items# " << JoinSeq(", ", retriedItems) + ); + } + } + void TransferData(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); auto& item = importInfo->Items.at(itemIdx); @@ -481,6 +576,17 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase return TStringBuilder() << importInfo->Id << "-" << itemIdx << "-" << item.NextIndexIdx; } + void KillChildActors(TImportInfo::TItem& item) { + if (auto schemeGetter = std::exchange(item.SchemeGetter, {})) { + Send(schemeGetter, new TEvents::TEvPoisonPill()); + Self->RunningImportSchemeGetters.erase(schemeGetter); + } + if (auto schemeQueryExecutor = std::exchange(item.SchemeQueryExecutor, {})) { + Send(schemeQueryExecutor, new TEvents::TEvPoisonPill()); + Self->RunningImportSchemeQueryExecutors.erase(schemeQueryExecutor); + } + } + void Cancel(TImportInfo::TPtr importInfo, ui32 itemIdx, TStringBuf marker) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); const auto& item = importInfo->Items.at(itemIdx); @@ -492,6 +598,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase importInfo->State = EState::Cancelled; for (ui32 i : xrange(importInfo->Items.size())) { + KillChildActors(importInfo->Items[i]); if (i == itemIdx) { continue; } @@ -515,6 +622,23 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } + void CancelAndPersist(NIceDb::TNiceDb& db, TImportInfo::TPtr importInfo, ui32 itemIdx, TStringBuf itemIssue, TStringBuf marker) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + auto& item = importInfo->Items[itemIdx]; + + item.Issue = itemIssue; + PersistImportItemState(db, importInfo, itemIdx); + + if (importInfo->State != EState::Waiting) { + return; + } + + Cancel(importInfo, itemIdx, marker); + PersistImportState(db, importInfo); + + SendNotificationsIfFinished(importInfo); + } + TMaybe GetIssues(const TPathId& dstPathId, TTxId restoreTxId) { Y_ABORT_UNLESS(Self->Tables.contains(dstPathId)); TTableInfo::TPtr table = Self->Tables.at(dstPathId); @@ -610,11 +734,19 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } break; - case EState::CreateTable: + case EState::CreateSchemeObject: case EState::Transferring: case EState::BuildIndexes: if (item.WaitTxId == InvalidTxId) { - AllocateTxId(importInfo, itemIdx); + if (!IsCreatedByQuery(item) || item.PreparedCreationQuery) { + AllocateTxId(importInfo, itemIdx); + } else { + const auto database = GetDatabase(*Self); + item.SchemeQueryExecutor = ctx.Register(CreateSchemeQueryExecutor( + Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database + )); + Self->RunningImportSchemeQueryExecutors.emplace(item.SchemeQueryExecutor); + } } else { SubscribeTx(importInfo, itemIdx); } @@ -671,13 +803,16 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } - void OnSchemeResult(TTransactionContext& txc, const TActorContext&) { + void OnSchemeResult(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(SchemeResult); const auto& msg = *SchemeResult->Get(); LOG_D("TImport::TTxProgress: OnSchemeResult" - << ": id# " << msg.ImportId); + << ": id# " << msg.ImportId + << ", itemIdx# " << msg.ItemIdx + << ", success# " << msg.Success + ); if (!Self->Imports.contains(msg.ImportId)) { LOG_E("TImport::TTxProgress: OnSchemeResult received unknown id" @@ -693,32 +828,103 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase return; } - auto& item = importInfo->Items.at(msg.ItemIdx); NIceDb::TNiceDb db(txc.DB); - TString error; - if (!msg.Success || !CreateTablePropose(Self, TTxId(), importInfo, msg.ItemIdx, error)) { - item.Issue = msg.Success ? error : msg.Error; - Self->PersistImportItemState(db, importInfo, msg.ItemIdx); + auto& item = importInfo->Items.at(msg.ItemIdx); + Self->RunningImportSchemeGetters.erase(std::exchange(item.SchemeGetter, {})); - if (importInfo->State != EState::Waiting) { - return; + if (!msg.Success) { + return CancelAndPersist(db, importInfo, msg.ItemIdx, msg.Error, "cannot get scheme"); + } + if (!IsCreatedByQuery(item)) { + TString error; + if (!CreateTablePropose(Self, TTxId(), importInfo, msg.ItemIdx, error)) { + return CancelAndPersist(db, importInfo, msg.ItemIdx, error, "invalid scheme"); } - - Cancel(importInfo, msg.ItemIdx, "cannot get/invalid scheme"); - Self->PersistImportState(db, importInfo); - - return SendNotificationsIfFinished(importInfo); + } else { + // Send the creation query to KQP to prepare. + const auto database = GetDatabase(*Self); + const TString source = TStringBuilder() + << importInfo->Settings.items(msg.ItemIdx).source_prefix() << NYdb::NDump::NFiles::CreateView().FileName; + + NYql::TIssues issues; + if (!NYdb::NDump::RewriteCreateViewQuery(item.CreationQuery, database, true, item.DstPathName, source, issues)) { + return CancelAndPersist(db, importInfo, msg.ItemIdx, issues.ToString(), "invalid view creation query"); + } + item.SchemeQueryExecutor = ctx.Register(CreateSchemeQueryExecutor( + Self->SelfId(), msg.ImportId, msg.ItemIdx, item.CreationQuery, database + )); + Self->RunningImportSchemeQueryExecutors.emplace(item.SchemeQueryExecutor); } Self->PersistImportItemScheme(db, importInfo, msg.ItemIdx); - item.State = EState::CreateTable; + item.State = EState::CreateSchemeObject; Self->PersistImportItemState(db, importInfo, msg.ItemIdx); - AllocateTxId(importInfo, msg.ItemIdx); + if (!IsCreatedByQuery(item)) { + AllocateTxId(importInfo, msg.ItemIdx); + } + } + + void OnSchemeQueryPreparation(TTransactionContext& txc) { + Y_ABORT_UNLESS(SchemeQueryResult); + const auto& message = *SchemeQueryResult.Get()->Get(); + const TString error = std::holds_alternative(message.Result) ? std::get(message.Result) : ""; + + LOG_D("TImport::TTxProgress: OnSchemeQueryPreparation" + << ": id# " << message.ImportId + << ", itemIdx# " << message.ItemIdx + << ", status# " << message.Status + << ", error# " << error + ); + + auto importInfo = Self->Imports.Value(message.ImportId, nullptr); + if (!importInfo) { + LOG_E("TImport::TTxProgress: OnSchemeQueryPreparation received unknown import id" + << ": id# " << message.ImportId + ); + return; + } + if (message.ItemIdx >= importInfo->Items.size()) { + LOG_E("TImport::TTxProgress: OnSchemeQueryPreparation item index out of range" + << ": id# " << message.ImportId + << ", item index# " << message.ItemIdx + << ", number of items# " << importInfo->Items.size() + ); + return; + } + + NIceDb::TNiceDb db(txc.DB); + + auto& item = importInfo->Items[message.ItemIdx]; + Self->RunningImportSchemeQueryExecutors.erase(std::exchange(item.SchemeQueryExecutor, {})); + + if (message.Status == Ydb::StatusIds::SCHEME_ERROR && item.ViewCreationRetries == 0) { + // Scheme error happens when the view depends on a table (or a view) that is not yet imported. + // Instead of tracking view dependencies, we simply retry the creation of the view later. + item.State = EState::Waiting; + + if (AllOf(importInfo->Items, IsWaiting)) { + // All items are waiting? Cancel the import, or we will end up waiting indefinitely. + return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); + } + + Self->PersistImportItemState(db, importInfo, message.ItemIdx); + return; + } + + if (message.Status != Ydb::StatusIds::SUCCESS || !error.empty()) { + return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); + } + + if (item.State == EState::CreateSchemeObject) { + item.PreparedCreationQuery = std::get(message.Result); + PersistImportItemPreparedCreationQuery(db, importInfo, message.ItemIdx); + AllocateTxId(importInfo, message.ItemIdx); + } } - void OnAllocateResult(TTransactionContext&, const TActorContext&) { + void OnAllocateResult(TTransactionContext& txc, const TActorContext&) { Y_ABORT_UNLESS(AllocateResult); const auto txId = TTxId(AllocateResult->Get()->TxIds.front()); @@ -748,7 +954,17 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } switch (item.State) { - case EState::CreateTable: + case EState::CreateSchemeObject: + if (item.PreparedCreationQuery) { + ExecutePreparedQuery(txc, importInfo, i, txId); + itemIdx = i; + break; + } + if (IsCreatedByQuery(item)) { + // We only need a txId for modify scheme transactions. + // If an object’s CreationQuery has not been prepared yet, it does not need a txId at this point. + break; + } if (!Self->TableProfilesLoaded) { Self->WaitForTableProfiles(id, i); } else { @@ -819,7 +1035,9 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase Self->TxIdToImport.erase(txId); txId = InvalidTxId; - if (record.GetStatus() == NKikimrScheme::StatusMultipleModifications) { + if (IsIn({ NKikimrScheme::StatusMultipleModifications, NKikimrScheme::StatusAlreadyExists }, + record.GetStatus() + )) { if (record.GetPathCreateTxId()) { txId = TTxId(record.GetPathCreateTxId()); } else if (item.State == EState::Transferring) { @@ -828,17 +1046,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } if (txId == InvalidTxId) { - item.Issue = record.GetReason(); - Self->PersistImportItemState(db, importInfo, itemIdx); - - if (importInfo->State != EState::Waiting) { - return; - } - - Cancel(importInfo, itemIdx, "unhappy propose"); - Self->PersistImportState(db, importInfo); - - return SendNotificationsIfFinished(importInfo); + return CancelAndPersist(db, importInfo, itemIdx, record.GetReason(), "unhappy propose"); } Self->TxIdToImport[txId] = {importInfo->Id, itemIdx}; @@ -852,7 +1060,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase return; } - if (item.State == EState::CreateTable) { + if (item.State == EState::CreateSchemeObject) { auto createPath = TPath::Resolve(item.DstPathName, Self); Y_ABORT_UNLESS(createPath); @@ -932,7 +1140,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase SubscribeTx(importInfo, itemIdx); } - void OnNotifyResult(TTransactionContext& txc, const TActorContext&) { + void OnNotifyResult(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(CompletedTxId); LOG_D("TImport::TTxProgress: OnNotifyResult" << ": txId# " << CompletedTxId); @@ -969,7 +1177,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } switch (item.State) { - case EState::CreateTable: + case EState::CreateSchemeObject: + if (IsCreatedByQuery(item)) { + item.State = EState::Done; + break; + } item.State = EState::Transferring; AllocateTxId(importInfo, itemIdx); break; @@ -1008,6 +1220,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (AllOf(importInfo->Items, &TImportInfo::TItem::IsDone)) { importInfo->State = EState::Done; importInfo->EndTime = TAppData::TimeProvider->Now(); + } else if (AllOf(importInfo->Items, IsDoneOrWaiting)) { + RetryViewsCreation(importInfo, db, ctx); } Self->PersistImportItemState(db, importInfo, itemIdx); @@ -1034,6 +1248,10 @@ ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeRe return new TImport::TTxProgress(this, ev); } +ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) { + return new TImport::TTxProgress(this, ev); +} + ITransaction* TSchemeShard::CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) { return new TImport::TTxProgress(this, ev); } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp index ab722b2c5e5c..84b740ad9947 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -32,9 +33,9 @@ class TSchemeGetter: public TActorBootstrapped { return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/metadata.json"; } - static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) { + static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, TStringBuf filename) { Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size()); - return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/scheme.pb"; + return TStringBuilder() << settings.items(itemIdx).source_prefix() << '/' << filename; } static TString PermissionsKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) { @@ -42,6 +43,14 @@ class TSchemeGetter: public TActorBootstrapped { return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb"; } + static bool IsView(TStringBuf schemeKey) { + return schemeKey.EndsWith(NYdb::NDump::NFiles::CreateView().FileName); + } + + static bool NoObjectFound(Aws::S3::S3Errors errorType) { + return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY; + } + void HeadObject(const TString& key) { auto request = Model::HeadObjectRequest() .WithKey(key); @@ -71,6 +80,13 @@ class TSchemeGetter: public TActorBootstrapped { << ": self# " << SelfId() << ", result# " << result); + if (!IsView(SchemeKey) && NoObjectFound(result.GetError().GetErrorType())) { + // try search for a view + SchemeKey = SchemeKeyFromSettings(ImportInfo->Settings, ItemIdx, NYdb::NDump::NFiles::CreateView().FileName); + HeadObject(SchemeKey); + return; + } + if (!CheckResult(result, "HeadObject")) { return; } @@ -86,8 +102,7 @@ class TSchemeGetter: public TActorBootstrapped { << ": self# " << SelfId() << ", result# " << result); - if (result.GetError().GetErrorType() == S3Errors::RESOURCE_NOT_FOUND - || result.GetError().GetErrorType() == S3Errors::NO_SUCH_KEY) { + if (NoObjectFound(result.GetError().GetErrorType())) { Reply(); // permissions are optional return; } else if (!CheckResult(result, "HeadObject")) { @@ -176,9 +191,13 @@ class TSchemeGetter: public TActorBootstrapped { LOG_T("Trying to parse scheme" << ": self# " << SelfId() + << ", itemIdx# " << ItemIdx + << ", schemeKey# " << SchemeKey << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); - if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) { + if (IsView(SchemeKey)) { + item.CreationQuery = msg.Body; + } else if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) { return Reply(false, "Cannot parse scheme"); } @@ -230,7 +249,7 @@ class TSchemeGetter: public TActorBootstrapped { StartValidatingChecksum(PermissionsKey, msg.Body, nextStep); } else { nextStep(); - } + } } void HandleChecksum(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { @@ -351,7 +370,7 @@ class TSchemeGetter: public TActorBootstrapped { , ImportInfo(importInfo) , ItemIdx(itemIdx) , MetadataKey(MetadataKeyFromSettings(importInfo->Settings, itemIdx)) - , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx)) + , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx, "scheme.pb")) , PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx)) , Retries(importInfo->Settings.number_of_retries()) , NeedDownloadPermissions(!importInfo->Settings.no_acl()) @@ -411,7 +430,7 @@ class TSchemeGetter: public TActorBootstrapped { const ui32 ItemIdx; const TString MetadataKey; - const TString SchemeKey; + TString SchemeKey; const TString PermissionsKey; const ui32 Retries; diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp new file mode 100644 index 000000000000..267a0679faef --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp @@ -0,0 +1,178 @@ +#include "schemeshard_import_helpers.h" +#include "schemeshard_import_scheme_query_executor.h" +#include "schemeshard_private.h" + +#include +#include +#include +#include +#include +#include + +#include + +using namespace NKikimr::NKqp; + +namespace NKikimr::NSchemeShard { + +class TSchemeQueryExecutor: public TActorBootstrapped { + + std::unique_ptr BuildCompileRequest() { + UserToken.Reset(MakeIntrusive("")); + + TKqpQuerySettings querySettings(NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY); + querySettings.IsInternalCall = true; + + GUCSettings = std::make_shared(); + + TKqpQueryId query( + TString(DefaultKikimrPublicClusterName), // cluster + Database, // database + "", // database id + SchemeQuery, // query text + querySettings, // query settings + nullptr, // query parameter types + *GUCSettings // GUC settings + ); + + // TO DO: get default query timeout from the app config + auto deadline = TAppData::TimeProvider->Now() + TDuration::Minutes(1); + TKqpCounters kqpCounters(AppData()->Counters, &TlsActivationContext->AsActorContext()); + IsInterestedInResult = std::make_shared>(true); + UserRequestContext.Reset(MakeIntrusive()); + + return std::make_unique( + UserToken, // user token + "", // client address + Nothing(), // query uid in query cache + query, // TKqpQueryId + false, // keep in query cache + true, // is query action prepare? + false, // per statement result + deadline, // deadline + kqpCounters.GetDbCounters(Database), // db counters + GUCSettings, // GUC settings + Nothing(), // application name + IsInterestedInResult, // is still interested in result? + UserRequestContext // user request context + ); + } + + void PrepareSchemeQuery() { + if (!Send(MakeKqpCompileServiceID(SelfId().NodeId()), BuildCompileRequest().release())) { + return Finish(Ydb::StatusIds::INTERNAL_ERROR, "cannot send query request"); + } + Become(&TThis::StateExecute); + } + + void HandleCompileResponse(const TEvKqp::TEvCompileResponse::TPtr& ev) { + const auto* result = ev->Get()->CompileResult.get(); + if (!result) { + return Finish(Ydb::StatusIds::GENERIC_ERROR, "empty compile response"); + } + + LOG_D("TSchemeQueryExecutor HandleCompileResponse" + << ", self: " << SelfId() + << ", status: " << result->Status; + ); + + if (result->Status != Ydb::StatusIds::SUCCESS) { + return Finish(result->Status, result->Issues.ToOneLineString()); + } + if (!result->PreparedQuery) { + return Finish(Ydb::StatusIds::GENERIC_ERROR, "no prepared query"); + } + const auto& transactions = result->PreparedQuery->GetPhysicalQuery().GetTransactions(); + if (transactions.empty()) { + return Finish(Ydb::StatusIds::GENERIC_ERROR, "empty transactions"); + } + if (!transactions[0].HasSchemeOperation()) { + return Finish(Ydb::StatusIds::GENERIC_ERROR, "no scheme operations"); + } + if (!transactions[0].GetSchemeOperation().HasCreateView()) { + return Finish(Ydb::StatusIds::GENERIC_ERROR, "no create view operation"); + } + const auto& createView = transactions[0].GetSchemeOperation().GetCreateView(); + Finish(result->Status, createView); + } + + void Finish(Ydb::StatusIds::StatusCode status, std::variant result) { + auto logMessage = TStringBuilder() << "TSchemeQueryExecutor Reply" + << ", self: " << SelfId() + << ", success: " << status; + LOG_I(logMessage); + + std::visit([&](T& value) { + if constexpr (std::is_same_v) { + logMessage << ", error: " << value; + } else if constexpr (std::is_same_v) { + logMessage << ", prepared query: " << value.ShortDebugString().Quote(); + } + LOG_D(logMessage); + Send(ReplyTo, new TEvPrivate::TEvImportSchemeQueryResult(ImportId, ItemIdx, status, std::move(value))); + }, result); + + PassAway(); + } + +public: + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::IMPORT_SCHEME_QUERY_EXECUTOR; + } + + TSchemeQueryExecutor( + TActorId replyTo, + ui64 importId, + ui32 itemIdx, + const TString& schemeQuery, + const TString& database + ) + : ReplyTo(replyTo) + , ImportId(importId) + , ItemIdx(itemIdx) + , SchemeQuery(schemeQuery) + , Database(database) + { + } + + void Bootstrap() { + PrepareSchemeQuery(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateExecute) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKqp::TEvCompileResponse, HandleCompileResponse); + default: + return StateBase(ev); + } + } + +private: + + TActorId ReplyTo; + ui64 ImportId; + ui32 ItemIdx; + TString SchemeQuery; + TString Database; + + // The following pointer-type event arguments are necessary for constructing the compile request. + // These pointers must remain valid until the compilation response is received. + TIntrusiveConstPtr UserToken; + TGUCSettings::TPtr GUCSettings; + std::shared_ptr> IsInterestedInResult; + TIntrusivePtr UserRequestContext; + +}; // TSchemeQueryExecutor + +IActor* CreateSchemeQueryExecutor(NActors::TActorId replyTo, ui64 importId, ui32 itemIdx, const TString& schemeQuery, const TString& database) { + return new TSchemeQueryExecutor(replyTo, importId, itemIdx, schemeQuery, database); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h new file mode 100644 index 000000000000..550c9c145437 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace NKikimr::NSchemeShard { + +NActors::IActor* CreateSchemeQueryExecutor(NActors::TActorId replyTo, ui64 importId, ui32 itemIdx, const TString& creationQuery, const TString& database); + +} diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index a996330c1349..42e45e06683a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2825,7 +2825,7 @@ struct TImportInfo: public TSimpleRefCount { Invalid = 0, Waiting, GetScheme, - CreateTable, + CreateSchemeObject, Transferring, BuildIndexes, Done = 240, @@ -2847,14 +2847,19 @@ struct TImportInfo: public TSimpleRefCount { TString DstPathName; TPathId DstPathId; Ydb::Table::CreateTableRequest Scheme; + TString CreationQuery; + TMaybe PreparedCreationQuery; TMaybeFail Permissions; NBackup::TMetadata Metadata; EState State = EState::GetScheme; ESubState SubState = ESubState::AllocateTxId; TTxId WaitTxId = InvalidTxId; + TActorId SchemeGetter; + TActorId SchemeQueryExecutor; int NextIndexIdx = 0; TString Issue; + int ViewCreationRetries = 0; TItem() = default; diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index ef5909153a89..02aec915e902 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -1,8 +1,13 @@ #pragma once -#include "defs.h" - #include "schemeshard_identificators.h" +#include +#include +#include +#include + +#include + namespace NKikimr { namespace NSchemeShard { @@ -15,6 +20,7 @@ namespace TEvPrivate { EvRunConditionalErase, EvIndexBuildBilling, EvImportSchemeReady, + EvImportSchemeQueryResult, EvExportSchemeUploadResult, EvServerlessStorageBilling, EvCleanDroppedPaths, @@ -96,6 +102,29 @@ namespace TEvPrivate { {} }; + struct TEvImportSchemeQueryResult: public TEventLocal { + const ui64 ImportId; + const ui32 ItemIdx; + const Ydb::StatusIds::StatusCode Status; + const std::variant Result; + + // failed query + TEvImportSchemeQueryResult(ui64 id, ui32 itemIdx, Ydb::StatusIds::StatusCode status, TString&& error) + : ImportId(id) + , ItemIdx(itemIdx) + , Status(status) + , Result(error) + {} + + // successful query + TEvImportSchemeQueryResult(ui64 id, ui32 itemIdx, Ydb::StatusIds::StatusCode status, NKikimrSchemeOp::TModifyScheme&& preparedQuery) + : ImportId(id) + , ItemIdx(itemIdx) + , Status(status) + , Result(preparedQuery) + {} + }; + struct TEvExportSchemeUploadResult: public TEventLocal { const ui64 ExportId; const ui32 ItemIdx; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 3d8f0b7d5616..ce4c4e26cb33 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1557,6 +1557,9 @@ struct Schema : NIceDb::Schema { struct DstPathOwnerId : Column<4, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; struct DstPathLocalId : Column<5, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; struct Scheme : Column<6, NScheme::NTypeIds::String> {}; + struct CreationQuery : Column<13, NScheme::NTypeIds::Utf8> {}; + // NKikimrSchemeOp::TModifyScheme serialized as string + struct PreparedCreationQuery : Column<14, NScheme::NTypeIds::String> {}; struct Permissions : Column<11, NScheme::NTypeIds::String> {}; struct Metadata : Column<12, NScheme::NTypeIds::String> {}; @@ -1573,6 +1576,8 @@ struct Schema : NIceDb::Schema { DstPathOwnerId, DstPathLocalId, Scheme, + CreationQuery, + PreparedCreationQuery, Permissions, Metadata, State, diff --git a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp index bd880a149a4d..9026d36746de 100644 --- a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp +++ b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp @@ -131,11 +131,11 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { Y_UNIT_TEST(ShouldSucceedOnSingleView) { RunS3({ { + EPathTypeView, R"( Name: "View" QueryText: "some query" - )", - EPathTypeView + )" } }, R"( ExportToS3Settings { @@ -152,19 +152,19 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { Y_UNIT_TEST(ShouldSucceedOnViewsAndTables) { RunS3({ { + EPathTypeView, R"( Name: "View" QueryText: "some query" - )", - EPathTypeView + )" }, { + EPathTypeTable, R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] - )", - EPathTypeTable + )" } }, R"( ExportToS3Settings { @@ -260,11 +260,11 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { Y_UNIT_TEST(CancelShouldSucceedOnSingleView) { CancelS3({ { + EPathTypeView, R"( Name: "View" QueryText: "some query" - )", - EPathTypeView + )" } }, R"( ExportToS3Settings { @@ -281,19 +281,19 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { Y_UNIT_TEST(CancelShouldSucceedOnViewsAndTables) { CancelS3({ { + EPathTypeView, R"( Name: "View" QueryText: "some query" - )", - EPathTypeView + )" }, { + EPathTypeTable, R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] - )", - EPathTypeTable + )" } }, R"( ExportToS3Settings { @@ -389,11 +389,11 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { Y_UNIT_TEST(ForgetShouldSucceedOnSingleView) { ForgetS3({ { + EPathTypeView, R"( Name: "View" QueryText: "some query" - )", - EPathTypeView + )" } }, R"( ExportToS3Settings { @@ -410,19 +410,19 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { Y_UNIT_TEST(ForgetShouldSucceedOnViewsAndTables) { ForgetS3({ { + EPathTypeView, R"( Name: "View" QueryText: "some query" - )", - EPathTypeView + )" }, { + EPathTypeTable, R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] - )", - EPathTypeTable + )" } }, R"( ExportToS3Settings { diff --git a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp index 56444c79bdd1..899620a2d2e0 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp @@ -13,7 +13,7 @@ namespace NExportReboots { void CreateSchemeObjects(TTestWithReboots& t, TTestActorRuntime& runtime, const TVector& schemeObjects) { TSet toWait; - for (const auto& [scheme, type] : schemeObjects) { + for (const auto& [type, scheme] : schemeObjects) { switch (type) { case EPathTypeTable: TestCreateTable(runtime, ++t.TxId, "/MyRoot", scheme); diff --git a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.h b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.h index 6682b0553547..e3c821433e03 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.h +++ b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.h @@ -1,6 +1,6 @@ #pragma once -#include +#include "ut_backup_restore_common.h" #include #include @@ -11,21 +11,6 @@ class TTestWithReboots; namespace NExportReboots { -struct TTypedScheme { - TString Scheme; - NKikimrSchemeOp::EPathType Type; - - explicit TTypedScheme(const TString& scheme, NKikimrSchemeOp::EPathType type = NKikimrSchemeOp::EPathTypeTable) - : Scheme(scheme) - , Type(type) - {} - - TTypedScheme(const char* scheme, NKikimrSchemeOp::EPathType type = NKikimrSchemeOp::EPathTypeTable) - : Scheme(scheme) - , Type(type) - {} -}; - void Run(const TVector& schemeObjects, const TString& request, TTestWithReboots& t); void Cancel(const TVector& schemeObjects, const TString& request, TTestWithReboots& t); void Forget(const TVector& schemeObjects, const TString& request, TTestWithReboots& t); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 1208494d651e..af83393c1363 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -1,9 +1,13 @@ #include "test_env.h" #include "helpers.h" -#include #include +#include #include +#include +#include +#include +#include #include #include #include @@ -12,7 +16,7 @@ #include #include #include -#include +#include #include @@ -504,6 +508,58 @@ NSchemeShardUT_Private::TTestWithReboots::TDatashardLogBatchingSwitch::~TDatasha NKikimr::NDataShard::gAllowLogBatchingDefaultValue = PrevVal; } +void SetupMetadataProvider(TTestActorRuntime& runtime, ui32 nodeIdx) { + NKikimrConfig::TMetadataProviderConfig metadataProviderConfig; + + NMetadata::NProvider::TConfig config; + config.DeserializeFromProto(metadataProviderConfig); + IActor* actor = NMetadata::NProvider::CreateService(config); + + const ui32 userPoolId = runtime.GetAppData(nodeIdx).UserPoolId; + TActorId metadataServiceId = runtime.Register(actor, nodeIdx, userPoolId, TMailboxType::Revolving, 0); + runtime.RegisterService(NMetadata::NProvider::MakeServiceId(runtime.GetNodeId(nodeIdx)), metadataServiceId, nodeIdx); +} + +void SetupKqpResourceManager(TTestActorRuntime& runtime, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + ui32 nodeIdx +) { + const ui32 nodeId = runtime.GetNodeId(nodeIdx); + auto kqpProxySharedResources = std::make_shared(); + + IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor( + tableServiceConfig.GetResourceManager(), nullptr, {}, kqpProxySharedResources, nodeId + ); + + const ui32 userPoolId = runtime.GetAppData(nodeIdx).UserPoolId; + TActorId kqpRmServiceId = runtime.Register(kqpRmService, nodeIdx, userPoolId); + runtime.RegisterService(NKqp::MakeKqpRmServiceID(nodeId), kqpRmServiceId, nodeIdx); +} + +void SetupKqpProxy(TTestActorRuntime& runtime, ui32 nodeIdx) { + NKikimrConfig::TTableServiceConfig tableServiceConfig; + SetupKqpResourceManager(runtime, tableServiceConfig, nodeIdx); + + NKikimrConfig::TLogConfig logConfig; + NKikimrConfig::TQueryServiceConfig queryServiceConfig; + auto federatedQuerySetupFactory = std::make_shared(); + + IActor* kqpProxyService = NKqp::CreateKqpProxyService( + logConfig, + tableServiceConfig, + queryServiceConfig, + {}, // kqp settings + nullptr, // query replay factory + nullptr, // kqp proxy shared resources + federatedQuerySetupFactory, + nullptr // S3 actors factory + ); + + const ui32 userPoolId = runtime.GetAppData(nodeIdx).UserPoolId; + TActorId kqpProxyServiceId = runtime.Register(kqpProxyService, nodeIdx, userPoolId); + runtime.RegisterService(NKqp::MakeKqpProxyID(runtime.GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); +} + NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTestEnvOptions& opts, TSchemeShardFactory ssFactory, std::shared_ptr dsExportFactory) : SchemeShardFactory(ssFactory) , HiveState(new TFakeHiveState) @@ -635,6 +691,12 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe runtime.RegisterService(NSequenceProxy::MakeSequenceProxyServiceID(), sequenceProxyId, i); } + if (opts.SetupKqpProxy_) { + for (ui32 node = 0; node < runtime.GetNodeCount(); ++node) { + SetupMetadataProvider(runtime, node); + SetupKqpProxy(runtime, node); + } + } //SetupBoxAndStoragePool(runtime, sender, TTestTxConfig::DomainUid); TxReliablePropose = runtime.Register(new TTxReliablePropose(schemeRoot)); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index 916ad2489532..debe3ee1af6b 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -71,6 +71,7 @@ namespace NSchemeShardUT_Private { OPTION(std::optional, EnableTopicAutopartitioningForCDC, std::nullopt); OPTION(std::optional, EnableBackupService, std::nullopt); OPTION(std::optional, EnableTopicTransfer, std::nullopt); + OPTION(bool, SetupKqpProxy, false); #undef OPTION }; diff --git a/ydb/core/tx/schemeshard/ut_helpers/ut_backup_restore_common.h b/ydb/core/tx/schemeshard/ut_helpers/ut_backup_restore_common.h index ec3eb3b93839..eec5d324497c 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ut_backup_restore_common.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ut_backup_restore_common.h @@ -1,3 +1,4 @@ +#include #include using EDataFormat = NKikimr::NDataShard::NBackupRestoreTraits::EDataFormat; @@ -14,3 +15,23 @@ using ECompressionCodec = NKikimr::NDataShard::NBackupRestoreTraits::ECompressio static TTestRegistration##N testRegistration##N; \ template \ void N(NUnitTest::TTestContext&) + +struct TTypedScheme { + NKikimrSchemeOp::EPathType Type; + TString Scheme; + + TTypedScheme(const char* scheme) + : Type(NKikimrSchemeOp::EPathTypeTable) + , Scheme(scheme) + {} + + TTypedScheme(const TString& scheme) + : Type(NKikimrSchemeOp::EPathTypeTable) + , Scheme(scheme) + {} + + TTypedScheme(NKikimrSchemeOp::EPathType type, TString scheme) + : Type(type) + , Scheme(std::move(scheme)) + {} +}; diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index 0e13292c4423..f51381ef4197 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -35,10 +35,11 @@ #include #include -using namespace NKikimr; -using namespace NSchemeShardUT_Private; using namespace NKikimr::NSchemeShard; using namespace NKikimr::NWrappers::NTestHelpers; +using namespace NKikimr; +using namespace NKikimrSchemeOp; +using namespace NSchemeShardUT_Private; namespace { @@ -54,7 +55,7 @@ namespace { const TString EmptyYsonStr = R"([[[[];%false]]])"; - TString GenerateScheme(const NKikimrSchemeOp::TPathDescription& pathDesc) { + TString GenerateScheme(const TPathDescription& pathDesc) { UNIT_ASSERT(pathDesc.HasTable()); const auto& tableDesc = pathDesc.GetTable(); @@ -88,7 +89,7 @@ namespace { UNIT_ASSERT(describeResult.GetPathDescription().HasTable()); const auto& tableDesc = describeResult.GetPathDescription().GetTable(); - NKikimrSchemeOp::TTableDescription scheme; + TTableDescription scheme; scheme.MutableColumns()->CopyFrom(tableDesc.GetColumns()); scheme.MutableKeyColumnNames()->CopyFrom(tableDesc.GetKeyColumnNames()); @@ -140,7 +141,9 @@ namespace { struct TTestDataWithScheme { TString Metadata; + EPathType Type = EPathTypeTable; TString Scheme; + TString CreationQuery; TString Permissions; TVector Data; @@ -245,18 +248,29 @@ namespace { } TTestDataWithScheme GenerateTestData( - const TString& scheme, - const TVector>& shardsConfig, + const TTypedScheme& typedScheme, + const TVector>& shardsConfig = {{"a", 1}}, const TString& permissions = "", - const TString& metadata = "") - { + const TString& metadata = "" + ) { TTestDataWithScheme result; - result.Scheme = scheme; + result.Type = typedScheme.Type; result.Permissions = permissions; result.Metadata = metadata; - for (const auto& [keyPrefix, count] : shardsConfig) { - result.Data.push_back(GenerateTestData(keyPrefix, count)); + switch (typedScheme.Type) { + case EPathTypeTable: + result.Scheme = typedScheme.Scheme; + for (const auto& [keyPrefix, count] : shardsConfig) { + result.Data.emplace_back(GenerateTestData(keyPrefix, count)); + } + break; + case EPathTypeView: + result.CreationQuery = typedScheme.Scheme; + break; + default: + UNIT_FAIL("cannot create sample test data for the scheme object type: " << typedScheme.Type); + return {}; } return result; @@ -266,13 +280,24 @@ namespace { THashMap result; for (const auto& [prefix, item] : data) { - result.emplace(prefix + "/scheme.pb", item.Scheme); + switch (item.Type) { + case EPathTypeTable: + result.emplace(prefix + "/scheme.pb", item.Scheme); + break; + case EPathTypeView: + result.emplace(prefix + "/create_view.sql", item.CreationQuery); + break; + default: + UNIT_FAIL("cannot determine key for the scheme object type: " << item.Type); + return {}; + } + if (item.Metadata) { result.emplace(prefix + "/metadata.json", item.Metadata); } else { result.emplace(prefix + "/metadata.json", R"({"version": 0})"); // without checksums } - + if (item.Permissions) { result.emplace(prefix + "/permissions.pb", item.Permissions); } @@ -412,11 +437,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { NKqp::CompareYson(data.YsonStr, content); } - bool CheckDefaultFromSequence(const NKikimrSchemeOp::TTableDescription& desc) { + bool CheckDefaultFromSequence(const TTableDescription& desc) { for (const auto& column: desc.GetColumns()) { if (column.GetName() == "key") { switch (column.GetDefaultValueCase()) { - case NKikimrSchemeOp::TColumnDescription::kDefaultFromSequence: { + case TColumnDescription::kDefaultFromSequence: { const auto& fromSequence = column.GetDefaultFromSequence(); return fromSequence == "myseq"; } @@ -428,11 +453,11 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { return false; } - bool CheckDefaultFromLiteral(const NKikimrSchemeOp::TTableDescription& desc) { + bool CheckDefaultFromLiteral(const TTableDescription& desc) { for (const auto& column: desc.GetColumns()) { if (column.GetName() == "value") { switch (column.GetDefaultValueCase()) { - case NKikimrSchemeOp::TColumnDescription::kDefaultFromLiteral: { + case TColumnDescription::kDefaultFromLiteral: { const auto& fromLiteral = column.GetDefaultFromLiteral(); TString str; @@ -1207,7 +1232,7 @@ value { const auto secondTablet = TTestTxConfig::FakeHiveTablets + 1; UpdateRow(runtime, "Original", 1, "valueA", firstTablet); UpdateRow(runtime, "Original", 2, "valueB", secondTablet); - + // Add delay after copying tables ui64 copyTablesTxId; auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr& ev) { @@ -1813,7 +1838,7 @@ value { ui64 txId = 100; // prepare table schema with special policy - NKikimrSchemeOp::TTableDescription desc; + TTableDescription desc; desc.SetName("Table"); desc.AddKeyColumnNames("key"); { @@ -2994,7 +3019,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { } } )", { - NLs::HasTtlEnabled("modified_at", TDuration::Hours(2), NKikimrSchemeOp::TTTLSettings::UNIT_SECONDS), + NLs::HasTtlEnabled("modified_at", TDuration::Hours(2), TTTLSettings::UNIT_SECONDS), }); } @@ -3558,7 +3583,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { } return ev->Get()->Record - .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexedTable; + .GetTransaction(0).GetOperationType() == ESchemeOpCreateIndexedTable; }); } @@ -3569,7 +3594,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { } return ev->Get()->Record - .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpRestore; + .GetTransaction(0).GetOperationType() == ESchemeOpRestore; }); } @@ -3580,7 +3605,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { } return ev->Get()->Record - .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpApplyIndexBuild; + .GetTransaction(0).GetOperationType() == ESchemeOpApplyIndexBuild; }); } @@ -3791,7 +3816,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { } return ev->Get()->Record - .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpRestore; + .GetTransaction(0).GetOperationType() == ESchemeOpRestore; }; THolder delayed; @@ -3961,7 +3986,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { } return ev->Get()->Record - .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpRestore; + .GetTransaction(0).GetOperationType() == ESchemeOpRestore; }; THolder delayed; @@ -4636,42 +4661,156 @@ Y_UNIT_TEST_SUITE(TImportTests) { )", TTestTxConfig::FakeHiveTablets)); env.TestWaitNotification(runtime, txId); } + + Y_UNIT_TEST(ViewCreationRetry) { + TTestBasicRuntime runtime; + auto options = TTestEnvOptions() + .RunFakeConfigDispatcher(true) + .SetupKqpProxy(true); + TTestEnv env(runtime, options); + runtime.GetAppData().FeatureFlags.SetEnableViews(true); + runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + ui64 txId = 100; + + THashMap bucketContent(2); + bucketContent.emplace("/table", GenerateTestData(R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )")); + bucketContent.emplace("/view", GenerateTestData( + { + EPathTypeView, + R"( + -- backup root: "/MyRoot" + CREATE VIEW IF NOT EXISTS `view` WITH security_invoker = TRUE AS SELECT * FROM `table`; + )" + } + )); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock(ConvertTestData(bucketContent), TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + ui64 tableCreationTxId = 0; + TActorId schemeshardActorId; + TBlockEvents tableCreationBlocker(runtime, + [&](const TEvSchemeShard::TEvModifySchemeTransaction::TPtr& event) { + const auto& record = event->Get()->Record; + if (record.GetTransaction(0).GetOperationType() == ESchemeOpCreateIndexedTable) { + tableCreationTxId = record.GetTxId(); + schemeshardActorId = event->Recipient; + return true; + } + return false; + } + ); + + TBlockEvents queryResultBlocker(runtime, + [&](const TEvPrivate::TEvImportSchemeQueryResult::TPtr& event) { + // When we receive the scheme query result message, we can be sure that the SchemeShard actor ID is set, + // because the table is the first item on the import list. + if (!schemeshardActorId || event->Recipient != schemeshardActorId) { + return false; + } + UNIT_ASSERT_VALUES_EQUAL(event->Get()->Status, Ydb::StatusIds::SCHEME_ERROR); + const auto* error = std::get_if(&event->Get()->Result); + UNIT_ASSERT(error); + UNIT_ASSERT_STRING_CONTAINS(*error, "Cannot find table"); + return true; + } + ); + + const ui64 importId = ++txId; + TestImport(runtime, importId, "/MyRoot", Sprintf(R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "table" + destination_path: "/MyRoot/table" + } + items { + source_prefix: "view" + destination_path: "/MyRoot/view" + } + } + )", port)); + + runtime.WaitFor("table creation attempt", [&]{ return !tableCreationBlocker.empty(); }); + runtime.WaitFor("query result", [&]{ return !queryResultBlocker.empty(); }); + tableCreationBlocker.Unblock().Stop(); + queryResultBlocker.Unblock().Stop(); + env.TestWaitNotification(runtime, tableCreationTxId); + + env.TestWaitNotification(runtime, importId); + TestGetImport(runtime, importId, "/MyRoot"); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/view"), { + NLs::Finished, + NLs::IsView + }); + } } Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { - void ShouldSucceed(const TString& scheme) { + + constexpr TStringBuf DefaultImportSettings = R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "" + destination_path: "/MyRoot/Table" + } + } + )"; + + void ShouldSucceed(const THashMap& schemes, TStringBuf importSettings = DefaultImportSettings) { TPortManager portManager; const ui16 port = portManager.GetPort(); - const auto data = GenerateTestData(scheme, {{"a", 1}}); - - TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); + THashMap bucketContent(schemes.size()); + for (const auto& [prefix, typedScheme] : schemes) { + bucketContent.emplace(prefix, GenerateTestData(typedScheme)); + } + TS3Mock s3Mock(ConvertTestData(bucketContent), TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start()); TTestWithReboots t; + const bool createsViews = AnyOf(schemes, [](const auto& scheme) { + return scheme.second.Type == EPathTypeView; + }); + if (createsViews) { + t.GetTestEnvOptions().RunFakeConfigDispatcher(true); + t.GetTestEnvOptions().SetupKqpProxy(true); + } t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + if (createsViews) { + runtime.GetAppData().FeatureFlags.SetEnableViews(true); + } } - AsyncImport(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( - ImportFromS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_prefix: "" - destination_path: "/MyRoot/Table" - } - } - )", port)); - t.TestEnv->TestWaitNotification(runtime, t.TxId); + const ui64 importId = ++t.TxId; + AsyncImport(runtime, importId, "/MyRoot", Sprintf(importSettings.data(), port)); + t.TestEnv->TestWaitNotification(runtime, importId); { TInactiveZone inactive(activeZone); - TestGetImport(runtime, t.TxId, "/MyRoot", { + TestGetImport(runtime, importId, "/MyRoot", { Ydb::StatusIds::SUCCESS, Ydb::StatusIds::NOT_FOUND }); @@ -4679,6 +4818,10 @@ Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { }); } + void ShouldSucceed(const TTypedScheme& scheme) { + ShouldSucceed({{"", scheme}}); + } + Y_UNIT_TEST(ShouldSucceedOnSimpleTable) { ShouldSucceed(R"( columns { @@ -4712,36 +4855,96 @@ Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { )"); } - void CancelShouldSucceed(const TString& scheme) { + Y_UNIT_TEST(ShouldSucceedOnSingleView) { + ShouldSucceed( + { + EPathTypeView, + R"( + -- backup root: "/MyRoot" + CREATE VIEW IF NOT EXISTS `view` WITH security_invoker = TRUE AS SELECT 1; + )" + } + ); + } + + Y_UNIT_TEST(ShouldSucceedOnViewsAndTables) { + ShouldSucceed( + { + { + "/view", + { + EPathTypeView, + R"( + -- backup root: "/MyRoot" + CREATE VIEW IF NOT EXISTS `view` WITH security_invoker = TRUE AS SELECT 1; + )" + } + }, { + "/table", + { + EPathTypeTable, + R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )" + } + } + }, R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "view" + destination_path: "/MyRoot/View" + } + items { + source_prefix: "table" + destination_path: "/MyRoot/Table" + } + } + )" + ); + } + + void CancelShouldSucceed(const THashMap& schemes, TStringBuf importSettings = DefaultImportSettings) { TPortManager portManager; const ui16 port = portManager.GetPort(); - const auto data = GenerateTestData(scheme, {{"a", 1}}); - - TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); + THashMap bucketContent(schemes.size()); + for (const auto& [prefix, typedScheme] : schemes) { + bucketContent.emplace(prefix, GenerateTestData(typedScheme)); + } + TS3Mock s3Mock(ConvertTestData(bucketContent), TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start()); TTestWithReboots t; + const bool createsViews = AnyOf(schemes, [](const auto& scheme) { + return scheme.second.Type == EPathTypeView; + }); + if (createsViews) { + t.GetTestEnvOptions().RunFakeConfigDispatcher(true); + t.GetTestEnvOptions().SetupKqpProxy(true); + } t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + if (createsViews) { + runtime.GetAppData().FeatureFlags.SetEnableViews(true); + } } - AsyncImport(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( - ImportFromS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_prefix: "" - destination_path: "/MyRoot/Table" - } - } - )", port)); - t.TestEnv->TestWaitNotification(runtime, t.TxId); - const ui64 importId = t.TxId; + const ui64 importId = ++t.TxId; + AsyncImport(runtime, importId, "/MyRoot", Sprintf(importSettings.data(), port)); t.TestEnv->ReliablePropose(runtime, CancelImportRequest(++t.TxId, "/MyRoot", importId), { Ydb::StatusIds::SUCCESS, @@ -4751,15 +4954,23 @@ Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { { TInactiveZone inactive(activeZone); - TestGetImport(runtime, importId, "/MyRoot", { + const auto response = TestGetImport(runtime, importId, "/MyRoot", { Ydb::StatusIds::SUCCESS, Ydb::StatusIds::CANCELLED, Ydb::StatusIds::NOT_FOUND }); + const auto& entry = response.GetResponse().GetEntry(); + if (entry.GetStatus() == Ydb::StatusIds::CANCELLED) { + UNIT_ASSERT_STRING_CONTAINS(NYql::IssuesFromMessageAsString(entry.GetIssues()), "Cancelled manually"); + } } }); } + void CancelShouldSucceed(const TTypedScheme& scheme) { + CancelShouldSucceed({{"", scheme}}); + } + Y_UNIT_TEST(CancelShouldSucceedOnSimpleTable) { CancelShouldSucceed(R"( columns { @@ -4792,4 +5003,62 @@ Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { } )"); } + + Y_UNIT_TEST(CancelShouldSucceedOnSingleView) { + CancelShouldSucceed( + { + EPathTypeView, + R"( + -- backup root: "/MyRoot" + CREATE VIEW IF NOT EXISTS `view` WITH security_invoker = TRUE AS SELECT 1; + )" + } + ); + } + + Y_UNIT_TEST(CancelShouldSucceedOnViewsAndTables) { + CancelShouldSucceed( + { + { + "/view", + { + EPathTypeView, + R"( + -- backup root: "/MyRoot" + CREATE VIEW IF NOT EXISTS `view` WITH security_invoker = TRUE AS SELECT 1; + )" + } + }, { + "/table", + { + EPathTypeTable, + R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )" + } + } + }, R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "view" + destination_path: "/MyRoot/View" + } + items { + source_prefix: "table" + destination_path: "/MyRoot/Table" + } + } + )" + ); + } } diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 9596654f4700..c5bd695cfc36 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -235,6 +235,7 @@ SRCS( schemeshard_import__get.cpp schemeshard_import__list.cpp schemeshard_import_flow_proposals.cpp + schemeshard_import_scheme_query_executor.cpp schemeshard_info_types.cpp schemeshard_info_types.h schemeshard_path.cpp @@ -309,6 +310,7 @@ PEERDIR( ydb/library/login ydb/library/login/protos ydb/library/protobuf_printer + ydb/public/lib/ydb_cli/dump/files ydb/public/lib/ydb_cli/dump/util yql/essentials/minikql yql/essentials/providers/common/proto diff --git a/ydb/library/backup/backup.cpp b/ydb/library/backup/backup.cpp index 9ece91f28e31..2f7db4e15729 100644 --- a/ydb/library/backup/backup.cpp +++ b/ydb/library/backup/backup.cpp @@ -546,7 +546,7 @@ NView::TViewDescription DescribeView(NView::TViewClient& client, const TString& auto status = NConsoleClient::RetryFunction([&]() { return client.DescribeView(path).ExtractValueSync(); }); - VerifyStatus(status); + VerifyStatus(status, "describe view to build a backup"); return status.GetViewDescription(); } diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 4255a1cfc2c9..d00e46233032 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1076,5 +1076,6 @@ message TActivity { BS_GROUP_GETBLOCK = 653; HTTP_MON_INDEX_SERVICE = 654; HTTP_MON_AUTHORIZED_ACTOR_REQUEST = 655; + IMPORT_SCHEME_QUERY_EXECUTOR = 656; }; }; diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp index 816a9f5d446f..077db403dc74 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp @@ -20,7 +20,6 @@ #include #include -#include #include @@ -129,72 +128,11 @@ TRestoreResult CombineResults(const TVector& results) { return Result(); } -TString GetBackupRoot(TStringInput query) { - TString backupRoot; - - constexpr TStringBuf targetLinePrefix = "-- backup root: \""; - constexpr TStringBuf discardedSuffix = "\""; - TString line; - while (query.ReadLine(line)) { - if (line.StartsWith(targetLinePrefix)) { - backupRoot = line.substr( - std::size(targetLinePrefix), - std::size(line) - std::size(targetLinePrefix) - std::size(discardedSuffix) - ); - return backupRoot; - } - } - - return backupRoot; -} - bool IsDatabase(TSchemeClient& client, const TString& path) { auto result = DescribePath(client, path); return result.GetStatus() == EStatus::SUCCESS && result.GetEntry().Type == ESchemeEntryType::SubDomain; } -bool RewriteTablePathPrefix(TString& query, TStringBuf backupRoot, TStringBuf restoreRoot, - bool restoreRootIsDatabase, NIssue::TIssues& issues -) { - if (backupRoot == restoreRoot) { - return true; - } - - TString pathPrefix; - if (!re2::RE2::PartialMatch(query, R"(PRAGMA TablePathPrefix = '(\S+)';)", &pathPrefix)) { - if (!restoreRootIsDatabase) { - // Initially, the view used the implicit table path prefix, but this is no longer feasible - // since the restore root is different from the database root. - // Consequently, we must issue an explicit TablePathPrefix pragma to ensure that the reference targets - // maintain the same relative positions to the view's location as they did previously. - - size_t contextRecreationEnd = query.find("CREATE VIEW"); - if (contextRecreationEnd == TString::npos) { - issues.AddIssue(TStringBuilder() << "no create view statement in the query: " << query); - return false; - } - query.insert(contextRecreationEnd, TString( - std::format("PRAGMA TablePathPrefix = '{}';\n", restoreRoot.data()) - )); - } - return true; - } - - pathPrefix = RewriteAbsolutePath(pathPrefix, backupRoot, restoreRoot); - - constexpr TStringBuf pattern = R"(PRAGMA TablePathPrefix = '\S+';)"; - if (!re2::RE2::Replace(&query, pattern, - std::format(R"(PRAGMA TablePathPrefix = '{}';)", pathPrefix.c_str()) - )) { - issues.AddIssue(TStringBuilder() << "query: " << query.Quote() - << " does not contain the pattern: \"" << pattern << "\"" - ); - return false; - } - - return true; -} - } // anonymous namespace NPrivate { @@ -483,31 +421,11 @@ TRestoreResult TRestoreClient::RestoreView( const auto createViewFile = fsPath.Child(NFiles::CreateView().FileName); TString query = TFileInput(createViewFile).ReadAll(); - const auto backupRoot = GetBackupRoot(query); - { - NIssue::TIssues issues; - if (!RewriteTablePathPrefix(query, backupRoot, dbRestoreRoot, IsDatabase(SchemeClient, dbRestoreRoot), issues)) { - // hard fail since we want to avoid silent fails with wrong table path prefixes - return Result(dbPath, TStatus(EStatus::BAD_REQUEST, std::move(issues))); - } - } - { - NYql::TIssues issues; - RewriteTableRefs(query, backupRoot, dbRestoreRoot, issues); - if (!issues.Empty()) { - // soft fail since the only kind of table references that cause issues are evaluated absolute paths - // and they will fail during the query execution anyway - LOG_W(issues.ToOneLineString()); - } - } - - constexpr TStringBuf pattern = R"(CREATE VIEW IF NOT EXISTS `\S+` )"; - if (!re2::RE2::Replace(&query, pattern, std::format(R"(CREATE VIEW IF NOT EXISTS `{}` )", dbPath.c_str()))) { - NIssue::TIssues issues; - issues.AddIssue(TStringBuilder() << "Cannot restore a view from the file: " << createViewFile.GetPath().Quote() - << ". Pattern: \"" << pattern << "\", was not found in the create view statement: " << query.Quote() - ); - return Result(dbPath, TStatus(EStatus::BAD_REQUEST, std::move(issues))); + NYql::TIssues issues; + if (!RewriteCreateViewQuery(query, dbRestoreRoot, IsDatabase(SchemeClient, dbRestoreRoot), dbPath, + createViewFile.GetPath().Quote(), issues + )) { + return Result(dbPath, EStatus::BAD_REQUEST, issues.ToString()); } if (settings.DryRun_) { diff --git a/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp b/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp index 4c61632a87b2..dfd41d5852ea 100644 --- a/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp +++ b/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp @@ -10,13 +10,37 @@ #include #include +#include #include +#include using namespace NSQLv1Generated; namespace { +TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot) { + if (backupRoot == restoreRoot) { + return TString(path); + } + + TPathSplitUnix pathSplit(path); + TPathSplitUnix backupRootSplit(backupRoot); + + size_t matchedParts = 0; + while (matchedParts < pathSplit.size() && matchedParts < backupRootSplit.size() + && pathSplit[matchedParts] == backupRootSplit[matchedParts] + ) { + ++matchedParts; + } + + TPathSplitUnix restoreRootSplit(restoreRoot); + for (size_t unmatchedParts = matchedParts + 1; unmatchedParts <= backupRootSplit.size(); ++unmatchedParts) { + restoreRootSplit.AppendComponent(".."); + } + return restoreRootSplit.AppendMany(pathSplit.begin() + matchedParts, pathSplit.end()).Reconstruct(); +} + struct TAbsolutePathRewriter { static bool IsAbsolutePath(TStringBuf path) { @@ -29,7 +53,7 @@ struct TAbsolutePathRewriter { } return TStringBuilder() << '`' - << NYdb::NDump::RewriteAbsolutePath(TStringBuf(path.begin() + 1, path.end() - 1), BackupRoot, RestoreRoot) + << ::RewriteAbsolutePath(TStringBuf(path.begin() + 1, path.end() - 1), BackupRoot, RestoreRoot) << '`'; } @@ -155,6 +179,26 @@ TString RewriteTableRefs(const TRule_sql_query& query, TStringBuf backupRoot, TS return tokenCollector.Tokens; } +struct TViewQuerySplit { + TString ContextRecreation; + TString Select; +}; + +TViewQuerySplit SplitViewQuery(TStringInput query) { + // to do: make the implementation more versatile + TViewQuerySplit split; + + TString line; + while (query.ReadLine(line)) { + (line.StartsWith("--") || line.StartsWith("PRAGMA ") + ? split.ContextRecreation + : split.Select + ) += line; + } + + return split; +} + bool SqlToProtoAst(const TString& query, TRule_sql_query& queryProto, NYql::TIssues& issues) { NSQLTranslation::TTranslationSettings settings; if (!NSQLTranslation::ParseTranslationSettings(query, settings, issues)) { @@ -185,30 +229,50 @@ bool ValidateViewQuery(const TString& query, NYql::TIssues& issues) { return ValidateTableRefs(queryProto, issues); } +void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& issues) { + NYql::TIssues subIssues; + if (!ValidateViewQuery(query, subIssues)) { + NYql::TIssue restorabilityIssue( + TStringBuilder() << "Restorability of the view: " << dbPath.Quote() + << " storing the following query:\n" + << query + << "\ncannot be guaranteed. For more information, please refer to the 'ydb tools dump' documentation." + ); + restorabilityIssue.Severity = NYql::TSeverityIds::S_WARNING; + for (const auto& subIssue : subIssues) { + restorabilityIssue.AddSubIssue(MakeIntrusive(subIssue)); + } + issues.AddIssue(std::move(restorabilityIssue)); + } } -namespace NYdb::NDump { +TString GetBackupRoot(TStringInput query) { + TString backupRoot; -TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot) { - if (backupRoot == restoreRoot) { - return TString(path); + constexpr TStringBuf targetLinePrefix = "-- backup root: \""; + constexpr TStringBuf discardedSuffix = "\""; + TString line; + while (query.ReadLine(line)) { + StripInPlace(line); + if (line.StartsWith(targetLinePrefix)) { + backupRoot = line.substr( + std::size(targetLinePrefix), + std::size(line) - std::size(targetLinePrefix) - std::size(discardedSuffix) + ); + return backupRoot; + } } - TPathSplitUnix pathSplit(path); - TPathSplitUnix backupRootSplit(backupRoot); + return backupRoot; +} - size_t matchedParts = 0; - while (matchedParts < pathSplit.size() && matchedParts < backupRootSplit.size() - && pathSplit[matchedParts] == backupRootSplit[matchedParts] - ) { - ++matchedParts; - } +bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues) { + google::protobuf::Arena arena; + NSQLTranslation::TTranslationSettings settings; + settings.Arena = &arena; - TPathSplitUnix restoreRootSplit(restoreRoot); - for (size_t unmatchedParts = matchedParts + 1; unmatchedParts <= backupRootSplit.size(); ++unmatchedParts) { - restoreRootSplit.AppendComponent(".."); - } - return restoreRootSplit.AppendMany(pathSplit.begin() + matchedParts, pathSplit.end()).Reconstruct(); + auto formatter = NSQLFormat::MakeSqlFormatter(settings); + return formatter->Format(query, formattedQuery, issues); } bool RewriteTableRefs(TString& query, TStringBuf backupRoot, TStringBuf restoreRoot, NYql::TIssues& issues) { @@ -216,7 +280,7 @@ bool RewriteTableRefs(TString& query, TStringBuf backupRoot, TStringBuf restoreR if (!SqlToProtoAst(query, queryProto, issues)) { return false; } - const auto rewrittenQuery = ::RewriteTableRefs(queryProto, backupRoot, restoreRoot); + const auto rewrittenQuery = RewriteTableRefs(queryProto, backupRoot, restoreRoot); // formatting here is necessary for the view to have pretty text inside it after the creation if (!Format(rewrittenQuery, query, issues)) { return false; @@ -224,47 +288,52 @@ bool RewriteTableRefs(TString& query, TStringBuf backupRoot, TStringBuf restoreR return true; } -TViewQuerySplit SplitViewQuery(TStringInput query) { - // to do: make the implementation more versatile - TViewQuerySplit split; +bool RewriteTablePathPrefix(TString& query, TStringBuf backupRoot, TStringBuf restoreRoot, + bool restoreRootIsDatabase, NYql::TIssues& issues +) { + if (backupRoot == restoreRoot) { + return true; + } - TString line; - while (query.ReadLine(line)) { - (line.StartsWith("--") || line.StartsWith("PRAGMA ") - ? split.ContextRecreation - : split.Select - ) += line; + TString pathPrefix; + if (!re2::RE2::PartialMatch(query, R"(PRAGMA TablePathPrefix = '(\S+)';)", &pathPrefix)) { + if (!restoreRootIsDatabase) { + // Initially, the view relied on the implicit table path prefix; + // however, this approach is now incorrect because the requested restore root differs from the database root. + // We need to explicitly set the TablePathPrefix pragma to ensure that the reference targets + // keep the same relative positions to the view's location as before. + + size_t contextRecreationEnd = query.find("CREATE VIEW"); + if (contextRecreationEnd == TString::npos) { + issues.AddIssue(TStringBuilder() << "no create view statement in the query: " << query); + return false; + } + query.insert(contextRecreationEnd, TString( + std::format("PRAGMA TablePathPrefix = '{}';\n", restoreRoot.data()) + )); + } + return true; } - return split; -} + pathPrefix = RewriteAbsolutePath(pathPrefix, backupRoot, restoreRoot); -void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& issues) { - NYql::TIssues subIssues; - if (!::ValidateViewQuery(query, subIssues)) { - NYql::TIssue restorabilityIssue( - TStringBuilder() << "Restorability of the view: " << dbPath.Quote() - << " storing the following query:\n" - << query - << "\ncannot be guaranteed. For more information, please refer to the 'ydb tools dump' documentation." + constexpr TStringBuf pattern = R"(PRAGMA TablePathPrefix = '\S+';)"; + if (!re2::RE2::Replace(&query, pattern, + std::format(R"(PRAGMA TablePathPrefix = '{}';)", pathPrefix.c_str()) + )) { + issues.AddIssue(TStringBuilder() << "query: " << query.Quote() + << " does not contain the pattern: \"" << pattern << "\"" ); - restorabilityIssue.Severity = NYql::TSeverityIds::S_WARNING; - for (const auto& subIssue : subIssues) { - restorabilityIssue.AddSubIssue(MakeIntrusive(subIssue)); - } - issues.AddIssue(std::move(restorabilityIssue)); + return false; } -} -bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues) { - google::protobuf::Arena arena; - NSQLTranslation::TTranslationSettings settings; - settings.Arena = &arena; + return true; +} - auto formatter = NSQLFormat::MakeSqlFormatter(settings); - return formatter->Format(query, formattedQuery, issues); } +namespace NYdb::NDump { + TString BuildCreateViewQuery( const TString& name, const TString& dbPath, const TString& viewQuery, const TString& backupRoot, NYql::TIssues& issues @@ -291,4 +360,29 @@ TString BuildCreateViewQuery( return formattedQuery; } +bool RewriteCreateViewQuery(TString& query, const TString& restoreRoot, bool restoreRootIsDatabase, + const TString& dbPath, const TString& source, NYql::TIssues& issues +) { + const auto backupRoot = GetBackupRoot(query); + + if (!RewriteTablePathPrefix(query, backupRoot, restoreRoot, restoreRootIsDatabase, issues)) { + return false; + } + + if (!RewriteTableRefs(query, backupRoot, restoreRoot, issues)) { + return false; + } + + constexpr TStringBuf pattern = R"(CREATE VIEW IF NOT EXISTS `\S+` )"; + if (!re2::RE2::Replace(&query, pattern, std::format(R"(CREATE VIEW IF NOT EXISTS `{}` )", dbPath.c_str()))) { + issues.AddIssue(TStringBuilder() + << "Cannot restore a view from the source: " << source + << ". Pattern: \"" << pattern << "\", was not found in the create view statement: " << query.Quote() + ); + return false; + } + + return true; +} + } diff --git a/ydb/public/lib/ydb_cli/dump/util/view_utils.h b/ydb/public/lib/ydb_cli/dump/util/view_utils.h index 2b87a5022e5b..15252d6b294f 100644 --- a/ydb/public/lib/ydb_cli/dump/util/view_utils.h +++ b/ydb/public/lib/ydb_cli/dump/util/view_utils.h @@ -2,25 +2,13 @@ namespace NYdb::NDump { -TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot); - -bool RewriteTableRefs(TString& scheme, TStringBuf backupRoot, TStringBuf restoreRoot, NYql::TIssues& issues); - -struct TViewQuerySplit { - TString ContextRecreation; - TString Select; -}; - -TViewQuerySplit SplitViewQuery(TStringInput query); - -// returns void, because the validation is non-blocking -void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& issues); - -bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues); - TString BuildCreateViewQuery( const TString& name, const TString& dbPath, const TString& viewQuery, const TString& backupRoot, NYql::TIssues& issues ); +bool RewriteCreateViewQuery(TString& query, const TString& restoreRoot, bool restoreRootIsDatabase, + const TString& dbPath, const TString& source, NYql::TIssues& issues +); + } diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index faf30d9e65dc..8759c64389d1 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6191,6 +6191,16 @@ "ColumnId": 12, "ColumnName": "Metadata", "ColumnType": "String" + }, + { + "ColumnId": 13, + "ColumnName": "CreationQuery", + "ColumnType": "Utf8" + }, + { + "ColumnId": 14, + "ColumnName": "PreparedCreationQuery", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -6208,7 +6218,9 @@ 9, 10, 11, - 12 + 12, + 13, + 14 ], "RoomID": 0, "Codec": 0,