From fa6de5c50954fbd6164d8e86e6e10f98f7985884 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Fri, 21 Feb 2025 02:40:16 +0300 Subject: [PATCH] try fix reboot --- .../schemeshard/schemeshard_import__create.cpp | 17 +++++++++++++++-- .../schemeshard_import_flow_proposals.cpp | 15 +++++---------- .../schemeshard_import_flow_proposals.h | 2 +- .../tx/schemeshard/schemeshard_info_types.h | 1 + .../tx/schemeshard/ut_helpers/ls_checks.cpp | 4 +--- 5 files changed, 23 insertions(+), 16 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index 1f3d423ea99e..1a4b754662d4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -643,12 +643,13 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase Y_ABORT_UNLESS(item.State == EState::CreateChangefeed); Y_ABORT_UNLESS(item.DstPathId); + Y_ABORT_UNLESS(item.StreamImplPath); - if (!Self->PathsById.contains(item.DstPathId)) { + if (!Self->PathsById.contains(item.StreamImplPath)) { return InvalidTxId; } - auto path = Self->PathsById.at(item.DstPathId); + auto path = Self->PathsById.at(item.StreamImplPath); if (path->PathState != NKikimrSchemeOp::EPathStateAlter) { return InvalidTxId; } @@ -1149,8 +1150,20 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } else if (item.State == EState::CreateChangefeed) { if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx); + if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists) { + item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers; + AllocateTxId(importInfo, itemIdx); + // return; + } } else { txId = GetActiveCreateConsumerTxId(importInfo, itemIdx); + if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists) { + if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) { + item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed; + AllocateTxId(importInfo, itemIdx); + } + // return; + } } } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index 48aee0757e30..0944fcafac95 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -295,7 +295,7 @@ THolder CreateChangefeedPropose( THolder CreateConsumersPropose( TSchemeShard* ss, TTxId txId, - const TImportInfo::TItem& item + TImportInfo::TItem& item ) { Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()); @@ -309,26 +309,21 @@ THolder CreateConsumersPropose( auto& pqGroup = *modifyScheme.MutableAlterPersQueueGroup(); const TPath dstPath = TPath::Init(item.DstPathId, ss); - modifyScheme.SetWorkingDir(dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name()); + const TString changefeedPath = dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name(); + modifyScheme.SetWorkingDir(changefeedPath); modifyScheme.SetInternal(true); pqGroup.SetName("streamImpl"); - auto describeSchemeResult = DescribePath(ss, TlsActivationContext->AsActorContext(), - dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name() + "/streamImpl"); + auto describeSchemeResult = DescribePath(ss, TlsActivationContext->AsActorContext(),changefeedPath + "/streamImpl"); const auto& response = describeSchemeResult->GetRecord().GetPathDescription(); + item.StreamImplPath = {response.GetSelf().GetSchemeshardId(), response.GetSelf().GetPathId()}; pqGroup.CopyFrom(response.GetPersQueueGroup()); pqGroup.ClearTotalGroupCount(); pqGroup.MutablePQTabletConfig()->ClearPartitionKeySchema(); - { - auto applyIf = modifyScheme.AddApplyIf(); - applyIf->SetPathId(response.GetSelf().GetPathId()); - applyIf->SetPathVersion(response.GetSelf().GetPathVersion()); - } - auto* tabletConfig = pqGroup.MutablePQTabletConfig(); const auto& pqConfig = AppData()->PQConfig; auto serviceTypes = NGRpcProxy::V1::GetSupportedClientServiceTypes(pqConfig); diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h index 77d7ddf3270d..107db1b34c47 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h @@ -55,7 +55,7 @@ THolder CreateChangefeedPropose( THolder CreateConsumersPropose( TSchemeShard* ss, TTxId txId, - const TImportInfo::TItem& item + TImportInfo::TItem& item ); } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 7f5cef7dc51b..0cba5242ec2b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2869,6 +2869,7 @@ struct TImportInfo: public TSimpleRefCount { int NextChangefeedIdx = 0; TString Issue; int ViewCreationRetries = 0; + NKikimr::TPathId StreamImplPath; TItem() = default; diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 39cec0ee0185..6775f1d6a2de 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -977,9 +977,7 @@ TCheckFunc ConsumerExist(const TString& name) { break; } } - if (!isExist) { - UNIT_ASSERT(false); - } + UNIT_ASSERT(isExist); }; }