Skip to content

Commit

Permalink
try fix reboot
Browse files Browse the repository at this point in the history
  • Loading branch information
stanislav-shchetinin committed Feb 20, 2025
1 parent f1aebbe commit fa6de5c
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 16 deletions.
17 changes: 15 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,12 +643,13 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase

Y_ABORT_UNLESS(item.State == EState::CreateChangefeed);
Y_ABORT_UNLESS(item.DstPathId);
Y_ABORT_UNLESS(item.StreamImplPath);

if (!Self->PathsById.contains(item.DstPathId)) {
if (!Self->PathsById.contains(item.StreamImplPath)) {
return InvalidTxId;
}

auto path = Self->PathsById.at(item.DstPathId);
auto path = Self->PathsById.at(item.StreamImplPath);
if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
return InvalidTxId;
}
Expand Down Expand Up @@ -1149,8 +1150,20 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
} else if (item.State == EState::CreateChangefeed) {
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
AllocateTxId(importInfo, itemIdx);
// return;
}
} else {
txId = GetActiveCreateConsumerTxId(importInfo, itemIdx);
if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists) {
if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
}
// return;
}
}

}
Expand Down
15 changes: 5 additions & 10 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
TSchemeShard* ss,
TTxId txId,
const TImportInfo::TItem& item
TImportInfo::TItem& item
) {
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());

Expand All @@ -309,26 +309,21 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
auto& pqGroup = *modifyScheme.MutableAlterPersQueueGroup();

const TPath dstPath = TPath::Init(item.DstPathId, ss);
modifyScheme.SetWorkingDir(dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name());
const TString changefeedPath = dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name();
modifyScheme.SetWorkingDir(changefeedPath);
modifyScheme.SetInternal(true);

pqGroup.SetName("streamImpl");

auto describeSchemeResult = DescribePath(ss, TlsActivationContext->AsActorContext(),
dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name() + "/streamImpl");
auto describeSchemeResult = DescribePath(ss, TlsActivationContext->AsActorContext(),changefeedPath + "/streamImpl");

const auto& response = describeSchemeResult->GetRecord().GetPathDescription();
item.StreamImplPath = {response.GetSelf().GetSchemeshardId(), response.GetSelf().GetPathId()};
pqGroup.CopyFrom(response.GetPersQueueGroup());

pqGroup.ClearTotalGroupCount();
pqGroup.MutablePQTabletConfig()->ClearPartitionKeySchema();

{
auto applyIf = modifyScheme.AddApplyIf();
applyIf->SetPathId(response.GetSelf().GetPathId());
applyIf->SetPathVersion(response.GetSelf().GetPathVersion());
}

auto* tabletConfig = pqGroup.MutablePQTabletConfig();
const auto& pqConfig = AppData()->PQConfig;
auto serviceTypes = NGRpcProxy::V1::GetSupportedClientServiceTypes(pqConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
TSchemeShard* ss,
TTxId txId,
const TImportInfo::TItem& item
TImportInfo::TItem& item
);

} // NSchemeShard
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2869,6 +2869,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
int NextChangefeedIdx = 0;
TString Issue;
int ViewCreationRetries = 0;
NKikimr::TPathId StreamImplPath;

TItem() = default;

Expand Down
4 changes: 1 addition & 3 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,9 +977,7 @@ TCheckFunc ConsumerExist(const TString& name) {
break;
}
}
if (!isExist) {
UNIT_ASSERT(false);
}
UNIT_ASSERT(isExist);
};
}

Expand Down

0 comments on commit fa6de5c

Please sign in to comment.