From 5a159f3e02b468e9bd69584360848182a2421d92 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Fri, 21 Feb 2025 19:35:57 +0300 Subject: [PATCH] fix review --- .../schemeshard_import__create.cpp | 35 ++++++++----------- .../tx/schemeshard/schemeshard_info_types.h | 2 +- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index 1188c6017724..aae0c56a0853 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -642,7 +642,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase const auto& item = importInfo->Items.at(itemIdx); Y_ABORT_UNLESS(item.State == EState::CreateChangefeed); - Y_ABORT_UNLESS(item.DstPathId); + Y_ABORT_UNLESS(item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateConsumers); Y_ABORT_UNLESS(item.StreamImplPathId); if (!Self->PathsById.contains(item.StreamImplPathId)) { @@ -851,14 +851,6 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase TTxId txId = InvalidTxId; switch (item.State) { - case EState::CreateChangefeed: - if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { - txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx); - } else { - txId = GetActiveCreateConsumerTxId(importInfo, itemIdx); - } - break; - case EState::Transferring: if (!CancelTransferring(importInfo, itemIdx)) { txId = GetActiveRestoreTxId(importInfo, itemIdx); @@ -1161,20 +1153,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } - if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) { - if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { - item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers; - AllocateTxId(importInfo, itemIdx); - } else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) { - item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed; - AllocateTxId(importInfo, itemIdx); - } else { - item.State = EState::Done; + if (txId == InvalidTxId) { + + if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) { + if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { + item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers; + AllocateTxId(importInfo, itemIdx); + } else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) { + item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed; + AllocateTxId(importInfo, itemIdx); + } else { + item.State = EState::Done; + } + return; } - return; - } - if (txId == InvalidTxId) { return CancelAndPersist(db, importInfo, itemIdx, record.GetReason(), "unhappy propose"); } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index f49ed899796d..8eb0027e1960 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2869,7 +2869,7 @@ struct TImportInfo: public TSimpleRefCount { int NextChangefeedIdx = 0; TString Issue; int ViewCreationRetries = 0; - NKikimr::TPathId StreamImplPathId; + TPathId StreamImplPathId; TItem() = default;