From 21b9a35c9e7640e73c1426d3028ffa3d07bf29d9 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Mon, 16 Sep 2024 19:46:53 +0300 Subject: [PATCH 1/2] Pre-serialized bootstrap config --- .../msgbus_server_pq_metarequest_ut.cpp | 2 +- .../kqp/executer_actor/kqp_data_executer.cpp | 2 +- ydb/core/persqueue/events/global.h | 12 +++++++++-- ydb/core/persqueue/partition.cpp | 6 +++--- ydb/core/persqueue/partition.h | 5 +++-- ydb/core/persqueue/pq_impl.cpp | 12 +++++------ ydb/core/persqueue/ut/common/pq_ut_common.cpp | 2 +- ydb/core/persqueue/ut/partition_ut.cpp | 4 ++-- ydb/core/persqueue/ut/pqtablet_ut.cpp | 2 +- .../persqueue/ut/user_action_processor_ut.cpp | 4 ++-- .../schemeshard__operation_common.cpp | 12 +++++------ .../schemeshard__operation_common.h | 21 +++++++++++++++---- 12 files changed, 53 insertions(+), 31 deletions(-) diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp index c0f92246a02a..c9c8c77ce265 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp @@ -169,7 +169,7 @@ class TMessageBusServerPersQueueRequestTestBase: public TTestBase { static int version = 0; ++version; - THolder request(new TEvPersQueue::TEvUpdateConfig()); + auto request = MakeHolder(); for (size_t i : partitions) { request->Record.MutableTabletConfig()->AddPartitionIds(i); } diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 33aa5f2966f8..8bfc90293c6a 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2688,7 +2688,7 @@ class TKqpDataExecuter : public TKqpExecuterBase(); + auto ev = std::make_unique(); if (t.hasWrite && writeId.Defined()) { auto* w = transaction.MutableWriteId(); diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index 154bd6205b56..b1331a958db6 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -73,11 +73,15 @@ struct TEvPersQueue { TEvResponse() {} }; - struct TEvUpdateConfig: public TEventPB { TEvUpdateConfig() {} }; + struct TEvUpdateConfigBuilder: public TEvUpdateConfig { + using TBase::Record; + }; + struct TEvUpdateBalancerConfig: public TEventPB { TEvUpdateBalancerConfig() {} @@ -245,7 +249,11 @@ struct TEvPersQueue { {} }; - struct TEvProposeTransaction : public TEventPB { + struct TEvProposeTransaction : public TEventPreSerializedPB { + }; + + struct TEvProposeTransactionBuilder: public TEvProposeTransaction { + using TBase::Record; }; struct TEvProposeTransactionResult : public TEventPB { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index bf216b3034ca..b188aa1e5f3d 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -899,7 +899,7 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record; + NKikimrPQ::TEvProposeTransaction event = ev->Get()->GetRecord(); Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(event.HasData()); const NKikimrPQ::TDataTransaction& txBody = event.GetData(); @@ -1990,7 +1990,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return EProcessResult::Continue; } t->Predicate.ConstructInPlace(true); - return PreProcessImmediateTx(t->ProposeTransaction->Record); + return PreProcessImmediateTx(t->ProposeTransaction->GetRecord()); } else if (t->Tx) { // Distributed TX if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined. @@ -2573,7 +2573,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE void TPartition::ExecImmediateTx(TTransaction& t) { --ImmediateTxCount; - auto& record = t.ProposeTransaction->Record; + auto record = t.ProposeTransaction->GetRecord(); Y_ABORT_UNLESS(record.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(record.HasData()); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 17062cd1bd37..7046f7ee34b3 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -73,8 +73,9 @@ struct TTransaction { : ProposeTransaction(proposeTx) , State(ECommitState::Committed) { - if (proposeTx->Record.HasSupportivePartitionActor()) { - SupportivePartitionActor = ActorIdFromProto(proposeTx->Record.GetSupportivePartitionActor()); + auto record = proposeTx->GetRecord(); + if (record.HasSupportivePartitionActor()) { + SupportivePartitionActor = ActorIdFromProto(record.GetSupportivePartitionActor()); } Y_ABORT_UNLESS(ProposeTransaction); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 374ec868cc92..985fac8161e4 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1626,7 +1626,7 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config, void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr ev, const TActorId& sender, const TActorContext& ctx) { - auto& record = ev->Record; + auto record = ev->GetRecord(); int oldConfigVersion = Config.HasVersion() ? static_cast(Config.GetVersion()) : -1; int newConfigVersion = NewConfig.HasVersion() ? static_cast(NewConfig.GetVersion()) : oldConfigVersion; @@ -3259,9 +3259,9 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.ShortDebugString()); + NKikimrPQ::TEvProposeTransaction event = ev->Get()->GetRecord(); + PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString()); - NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record; switch (event.GetTxBodyCase()) { case NKikimrPQ::TEvProposeTransaction::kData: HandleDataTransaction(ev->Release(), ctx); @@ -3316,7 +3316,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod void TPersQueue::HandleDataTransaction(TAutoPtr ev, const TActorContext& ctx) { - NKikimrPQ::TEvProposeTransaction& event = ev->Record; + NKikimrPQ::TEvProposeTransaction event = ev->GetRecord(); Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(event.HasData()); const NKikimrPQ::TDataTransaction& txBody = event.GetData(); @@ -3427,7 +3427,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr ev, const TActorContext& ctx) { - NKikimrPQ::TEvProposeTransaction& event = ev->Record; + NKikimrPQ::TEvProposeTransaction event = ev->GetRecord(); Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig); Y_ABORT_UNLESS(event.HasConfig()); @@ -3705,7 +3705,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) const auto front = std::move(EvProposeTransactionQueue.front()); EvProposeTransactionQueue.pop_front(); - const NKikimrPQ::TEvProposeTransaction& event = front->Record; + NKikimrPQ::TEvProposeTransaction event = front->GetRecord(); TDistributedTransaction& tx = Txs[event.GetTxId()]; switch (tx.State) { diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index 0d999407d39d..87bdcbda7653 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -44,7 +44,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, try { runtime.ResetScheduledCount(); - THolder request(new TEvPersQueue::TEvUpdateConfig()); + auto request = MakeHolder(); for (ui32 i = 0; i < parameters.partitions; ++i) { request->Record.MutableTabletConfig()->AddPartitionIds(i); } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 8bccd773e953..1235bb85526a 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -882,7 +882,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition, bool immediate, ui64 txId) { - auto event = MakeHolder(); + auto event = MakeHolder(); ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor()); auto* body = event->Record.MutableData(); @@ -1606,7 +1606,7 @@ ui64 TPartitionTxTestHelper::MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected) ui64 TPartitionTxTestHelper::MakeAndSendImmediateTx(const TSrcIdMap& srcIdsAffected) { auto actIter = AddWriteTxImpl(srcIdsAffected, NextActId++, 0); - auto event = MakeHolder(); + auto event = MakeHolder(); ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor()); auto* body = event->Record.MutableData(); diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 5c9ba35af4c5..f6388c452904 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -294,7 +294,7 @@ void TPQTabletFixture::SendToPipe(const TActorId& sender, void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params) { - auto event = MakeHolder(); + auto event = MakeHolder(); THashSet partitions; ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor()); diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp index c45bc09741bb..0796c620f39b 100644 --- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp +++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp @@ -647,7 +647,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition, bool immediate, ui64 txId) { - auto event = MakeHolder(); + auto event = MakeHolder(); ActorIdToProto(Ctx->Edge, event->Record.MutableSource()); auto* body = event->Record.MutableTxBody(); @@ -665,7 +665,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition, void TUserActionProcessorFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params) { - auto event = MakeHolder(); + auto event = MakeHolder(); // // Source diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 371e5acaf297..a3c6ad905c42 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -695,7 +695,7 @@ THolder TConfigureParts::MakeEvProposeTrans const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, - const std::optional& bootstrapConfig, + const std::optional& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -703,7 +703,7 @@ THolder TConfigureParts::MakeEvProposeTrans TTxState::ETxType txType, const TOperationContext& context) { - auto event = MakeHolder(); + auto event = MakeHolder(); event->Record.SetTxId(ui64(txId)); ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor()); @@ -719,7 +719,7 @@ THolder TConfigureParts::MakeEvProposeTrans databasePath); if (bootstrapConfig) { Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup); - event->Record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(*bootstrapConfig); + event->PreSerializedData += *bootstrapConfig; } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, @@ -734,7 +734,7 @@ THolder TConfigureParts::MakeEvUpdateConfig(TTxId const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, - const std::optional& bootstrapConfig, + const std::optional& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -742,7 +742,7 @@ THolder TConfigureParts::MakeEvUpdateConfig(TTxId TTxState::ETxType txType, const TOperationContext& context) { - auto event = MakeHolder(); + auto event = MakeHolder(); event->Record.SetTxId(ui64(txId)); MakePQTabletConfig(context, @@ -757,7 +757,7 @@ THolder TConfigureParts::MakeEvUpdateConfig(TTxId databasePath); if (bootstrapConfig) { Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup); - event->Record.MutableBootstrapConfig()->CopyFrom(*bootstrapConfig); + event->PreSerializedData += *bootstrapConfig; } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 1fc60f6be9d4..ad5e461adaa4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -701,6 +701,19 @@ class TConfigureParts: public TSubOperationState { } } + std::optional preSerializedBootstrapConfig; + if (bootstrapConfig) { + if (context.SS->EnablePQConfigTransactionsAtSchemeShard) { + NKikimrPQ::TEvProposeTransaction proto; + proto.MutableConfig()->MutableBootstrapConfig()->Swap(&*bootstrapConfig); + preSerializedBootstrapConfig.emplace(proto.SerializeAsString()); + } else { + NKikimrPQ::TUpdateConfig proto; + proto.MutableBootstrapConfig()->Swap(&*bootstrapConfig); + preSerializedBootstrapConfig.emplace(proto.SerializeAsString()); + } + } + for (auto shard : txState->Shards) { TShardIdx idx = shard.Idx; TTabletId tabletId = context.SS->ShardInfos.at(idx).TabletID; @@ -723,7 +736,7 @@ class TConfigureParts: public TSubOperationState { *pqShard, topicName, topicPath.PathString(), - bootstrapConfig, + preSerializedBootstrapConfig, cloudId, folderId, databaseId, @@ -736,7 +749,7 @@ class TConfigureParts: public TSubOperationState { *pqShard, topicName, topicPath.PathString(), - bootstrapConfig, + preSerializedBootstrapConfig, cloudId, folderId, databaseId, @@ -918,7 +931,7 @@ class TConfigureParts: public TSubOperationState { const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, - const std::optional& bootstrapConfig, + const std::optional& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -931,7 +944,7 @@ class TConfigureParts: public TSubOperationState { const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, - const std::optional& bootstrapConfig, + const std::optional& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, From 24bee3b1a88ebad19d50e7e22fef27ad6a4467cc Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Mon, 16 Sep 2024 22:51:01 +0300 Subject: [PATCH 2/2] Fix review issues --- ydb/core/persqueue/partition.cpp | 6 +- ydb/core/persqueue/partition.h | 2 +- ydb/core/persqueue/pq_impl.cpp | 10 +-- .../schemeshard__operation_common.cpp | 8 +-- .../schemeshard__operation_common.h | 72 ++++++++++++++----- 5 files changed, 66 insertions(+), 32 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index b188aa1e5f3d..f0694691f0a5 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -899,7 +899,7 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - NKikimrPQ::TEvProposeTransaction event = ev->Get()->GetRecord(); + const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord(); Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(event.HasData()); const NKikimrPQ::TDataTransaction& txBody = event.GetData(); @@ -2573,7 +2573,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE void TPartition::ExecImmediateTx(TTransaction& t) { --ImmediateTxCount; - auto record = t.ProposeTransaction->GetRecord(); + const auto& record = t.ProposeTransaction->GetRecord(); Y_ABORT_UNLESS(record.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(record.HasData()); @@ -2586,7 +2586,7 @@ void TPartition::ExecImmediateTx(TTransaction& t) t.Message); return; } - for (auto& operation : record.GetData().GetOperations()) { + for (const auto& operation : record.GetData().GetOperations()) { if (!operation.HasBegin() || !operation.HasEnd() || !operation.HasConsumer()) { continue; //Write operation - handled separately via WriteInfo } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 7046f7ee34b3..c954012caa66 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -73,7 +73,7 @@ struct TTransaction { : ProposeTransaction(proposeTx) , State(ECommitState::Committed) { - auto record = proposeTx->GetRecord(); + const auto& record = proposeTx->GetRecord(); if (record.HasSupportivePartitionActor()) { SupportivePartitionActor = ActorIdFromProto(record.GetSupportivePartitionActor()); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 985fac8161e4..b884160ad3f3 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1626,7 +1626,7 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config, void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr ev, const TActorId& sender, const TActorContext& ctx) { - auto record = ev->GetRecord(); + const auto& record = ev->GetRecord(); int oldConfigVersion = Config.HasVersion() ? static_cast(Config.GetVersion()) : -1; int newConfigVersion = NewConfig.HasVersion() ? static_cast(NewConfig.GetVersion()) : oldConfigVersion; @@ -3259,7 +3259,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - NKikimrPQ::TEvProposeTransaction event = ev->Get()->GetRecord(); + const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord(); PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString()); switch (event.GetTxBodyCase()) { @@ -3316,7 +3316,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod void TPersQueue::HandleDataTransaction(TAutoPtr ev, const TActorContext& ctx) { - NKikimrPQ::TEvProposeTransaction event = ev->GetRecord(); + NKikimrPQ::TEvProposeTransaction& event = *ev->MutableRecord(); Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(event.HasData()); const NKikimrPQ::TDataTransaction& txBody = event.GetData(); @@ -3427,7 +3427,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr ev, const TActorContext& ctx) { - NKikimrPQ::TEvProposeTransaction event = ev->GetRecord(); + const NKikimrPQ::TEvProposeTransaction& event = ev->GetRecord(); Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig); Y_ABORT_UNLESS(event.HasConfig()); @@ -3705,7 +3705,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) const auto front = std::move(EvProposeTransactionQueue.front()); EvProposeTransactionQueue.pop_front(); - NKikimrPQ::TEvProposeTransaction event = front->GetRecord(); + const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord(); TDistributedTransaction& tx = Txs[event.GetTxId()]; switch (tx.State) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index a3c6ad905c42..bdfd80bc99a7 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -695,7 +695,7 @@ THolder TConfigureParts::MakeEvProposeTrans const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, - const std::optional& bootstrapConfig, + const std::optional& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -719,7 +719,7 @@ THolder TConfigureParts::MakeEvProposeTrans databasePath); if (bootstrapConfig) { Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup); - event->PreSerializedData += *bootstrapConfig; + event->PreSerializedData += bootstrapConfig->GetPreSerializedProposeTransaction(); } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, @@ -734,7 +734,7 @@ THolder TConfigureParts::MakeEvUpdateConfig(TTxId const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, - const std::optional& bootstrapConfig, + const std::optional& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -757,7 +757,7 @@ THolder TConfigureParts::MakeEvUpdateConfig(TTxId databasePath); if (bootstrapConfig) { Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup); - event->PreSerializedData += *bootstrapConfig; + event->PreSerializedData += bootstrapConfig->GetPreSerializedUpdateConfig(); } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index ad5e461adaa4..1f8248f292ac 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -552,6 +552,54 @@ class TDone: public TSubOperationState { namespace NPQState { +class TBootstrapConfigWrapper: public NKikimrPQ::TBootstrapConfig { + struct TSerializedProposeTransaction { + TString Value; + + static TSerializedProposeTransaction Serialize(const NKikimrPQ::TBootstrapConfig& value) { + NKikimrPQ::TEvProposeTransaction record; + record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(value); + return {record.SerializeAsString()}; + } + }; + + struct TSerializedUpdateConfig { + TString Value; + + static TSerializedUpdateConfig Serialize(const NKikimrPQ::TBootstrapConfig& value) { + NKikimrPQ::TUpdateConfig record; + record.MutableBootstrapConfig()->CopyFrom(value); + return {record.SerializeAsString()}; + } + }; + + mutable std::optional> PreSerialized; + + template + const TString& Get() const { + if (!PreSerialized) { + PreSerialized.emplace(T::Serialize(*this)); + } + + const auto* value = std::get_if(&PreSerialized.value()); + Y_ABORT_UNLESS(value); + + return value->Value; + } + +public: + const TString& GetPreSerializedProposeTransaction() const { + return Get(); + } + + const TString& GetPreSerializedUpdateConfig() const { + return Get(); + } +}; + class TConfigureParts: public TSubOperationState { private: TOperationId OperationId; @@ -627,7 +675,6 @@ class TConfigureParts: public TSubOperationState { return false; } - bool ProgressState(TOperationContext& context) override { TTabletId ssId = context.SS->SelfTabletId(); @@ -669,7 +716,7 @@ class TConfigureParts: public TSubOperationState { TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString(); auto topicPath = TPath::Init(txState->TargetPathId, context.SS); - std::optional bootstrapConfig; + std::optional bootstrapConfig; if (txState->TxType == TTxState::TxCreatePQGroup && topicPath.Parent().IsCdcStream()) { bootstrapConfig.emplace(); @@ -701,19 +748,6 @@ class TConfigureParts: public TSubOperationState { } } - std::optional preSerializedBootstrapConfig; - if (bootstrapConfig) { - if (context.SS->EnablePQConfigTransactionsAtSchemeShard) { - NKikimrPQ::TEvProposeTransaction proto; - proto.MutableConfig()->MutableBootstrapConfig()->Swap(&*bootstrapConfig); - preSerializedBootstrapConfig.emplace(proto.SerializeAsString()); - } else { - NKikimrPQ::TUpdateConfig proto; - proto.MutableBootstrapConfig()->Swap(&*bootstrapConfig); - preSerializedBootstrapConfig.emplace(proto.SerializeAsString()); - } - } - for (auto shard : txState->Shards) { TShardIdx idx = shard.Idx; TTabletId tabletId = context.SS->ShardInfos.at(idx).TabletID; @@ -736,7 +770,7 @@ class TConfigureParts: public TSubOperationState { *pqShard, topicName, topicPath.PathString(), - preSerializedBootstrapConfig, + bootstrapConfig, cloudId, folderId, databaseId, @@ -749,7 +783,7 @@ class TConfigureParts: public TSubOperationState { *pqShard, topicName, topicPath.PathString(), - preSerializedBootstrapConfig, + bootstrapConfig, cloudId, folderId, databaseId, @@ -931,7 +965,7 @@ class TConfigureParts: public TSubOperationState { const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, - const std::optional& bootstrapConfig, + const std::optional& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -944,7 +978,7 @@ class TConfigureParts: public TSubOperationState { const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, - const std::optional& bootstrapConfig, + const std::optional& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId,