Skip to content

Commit

Permalink
try
Browse files Browse the repository at this point in the history
  • Loading branch information
stanislav-shchetinin committed Feb 19, 2025
1 parent f3ca9e4 commit f50eccb
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 0 deletions.
29 changes: 29 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
}

void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
item.SubState = ESubState::Proposed;

LOG_I("TImport::TTxProgress: CreateConsumers propose"
<< ": info# " << importInfo->ToString()
<< ", item# " << item.ToString(itemIdx)
<< ", txId# " << txId);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);

Send(Self->SelfId(), CreateConsumersPropose(Self, txId, item));
}

void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -1048,6 +1063,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
CreateChangefeed(importInfo, i, txId);
itemIdx = i;
break;

case EState::CreateConsumers:
CreateConsumers(importInfo, i, txId);
itemIdx = i;
break;

default:
break;
Expand Down Expand Up @@ -1292,10 +1312,19 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
case EState::CreateChangefeed:
if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
AllocateTxId(importInfo, itemIdx);
item.State = EState::CreateConsumers;
} else {
item.State = EState::Done;
}
break;

case EState::CreateConsumers:
if (item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
AllocateTxId(importInfo, itemIdx);
item.State = EState::CreateChangefeed;
} else {
item.State = EState::Done;
}

default:
return SendNotificationsIfFinished(importInfo);
Expand Down
30 changes: 30 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,35 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
return propose;
}

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
TSchemeShard* ss,
TTxId txId,
const TImportInfo::TItem& item
) {
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());

const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx];
const auto& topic = importChangefeedTopic.GetTopic();

auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;
auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
auto& pqGroup = *modifyScheme.MutableAlterPersQueueGroup();

const TPath dstPath = TPath::Init(item.DstPathId, ss);
Cerr << "pqGroup working dir: " << dstPath.Parent().PathString() << Endl;
Cerr << "pqGroup name: " << dstPath.LeafName() << Endl;
modifyScheme.SetWorkingDir(dstPath.Parent().PathString());
pqGroup.SetName(dstPath.LeafName());

for (const auto& consumer : topic.consumers()) {
auto pqConsumer = *pqGroup.MutablePQTabletConfig()->AddConsumers();
*pqConsumer.MutableName() = consumer.name();
}

return propose;
}

} // NSchemeShard
} // NKikimr
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
const TImportInfo::TItem& item
);

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
TSchemeShard* ss,
TTxId txId,
const TImportInfo::TItem& item
);

} // NSchemeShard
} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
Transferring,
BuildIndexes,
CreateChangefeed,
CreateConsumers,
Done = 240,
Cancellation = 250,
Cancelled = 251,
Expand Down

0 comments on commit f50eccb

Please sign in to comment.