Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
stanislav-shchetinin committed Feb 21, 2025
1 parent ffd888a commit 5a159f3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 22 deletions.
35 changes: 14 additions & 21 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
const auto& item = importInfo->Items.at(itemIdx);

Y_ABORT_UNLESS(item.State == EState::CreateChangefeed);
Y_ABORT_UNLESS(item.DstPathId);
Y_ABORT_UNLESS(item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateConsumers);
Y_ABORT_UNLESS(item.StreamImplPathId);

if (!Self->PathsById.contains(item.StreamImplPathId)) {
Expand Down Expand Up @@ -851,14 +851,6 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
TTxId txId = InvalidTxId;

switch (item.State) {
case EState::CreateChangefeed:
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
} else {
txId = GetActiveCreateConsumerTxId(importInfo, itemIdx);
}
break;

case EState::Transferring:
if (!CancelTransferring(importInfo, itemIdx)) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
Expand Down Expand Up @@ -1161,20 +1153,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
}

if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) {
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
AllocateTxId(importInfo, itemIdx);
} else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
if (txId == InvalidTxId) {

if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) {
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
AllocateTxId(importInfo, itemIdx);
} else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
return;
}
return;
}

if (txId == InvalidTxId) {
return CancelAndPersist(db, importInfo, itemIdx, record.GetReason(), "unhappy propose");
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2869,7 +2869,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
int NextChangefeedIdx = 0;
TString Issue;
int ViewCreationRetries = 0;
NKikimr::TPathId StreamImplPathId;
TPathId StreamImplPathId;

TItem() = default;

Expand Down

0 comments on commit 5a159f3

Please sign in to comment.