Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-serialized bootstrap config #9331

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class TMessageBusServerPersQueueRequestTestBase: public TTestBase {
static int version = 0;
++version;

THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
for (size_t i : partitions) {
request->Record.MutableTabletConfig()->AddPartitionIds(i);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2688,7 +2688,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
for (auto& [tabletId, t] : topicTxs) {
auto& transaction = t.tx;

auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();

if (t.hasWrite && writeId.Defined()) {
auto* w = transaction.MutableWriteId();
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/persqueue/events/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,15 @@ struct TEvPersQueue {
TEvResponse() {}
};

struct TEvUpdateConfig: public TEventPB<TEvUpdateConfig,
struct TEvUpdateConfig: public TEventPreSerializedPB<TEvUpdateConfig,
NKikimrPQ::TUpdateConfig, EvUpdateConfig> {
TEvUpdateConfig() {}
};

struct TEvUpdateConfigBuilder: public TEvUpdateConfig {
using TBase::Record;
};

struct TEvUpdateBalancerConfig: public TEventPB<TEvUpdateBalancerConfig,
NKikimrPQ::TUpdateBalancerConfig, EvUpdateBalancerConfig> {
TEvUpdateBalancerConfig() {}
Expand Down Expand Up @@ -245,7 +249,11 @@ struct TEvPersQueue {
{}
};

struct TEvProposeTransaction : public TEventPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
struct TEvProposeTransaction : public TEventPreSerializedPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
};

struct TEvProposeTransactionBuilder: public TEvProposeTransaction {
using TBase::Record;
};

struct TEvProposeTransactionResult : public TEventPB<TEvProposeTransactionResult, NKikimrPQ::TEvProposeTransactionResult, EvProposeTransactionResult> {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo

void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
NKikimrPQ::TEvProposeTransaction event = ev->Get()->GetRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
Expand Down Expand Up @@ -1990,7 +1990,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
return EProcessResult::Continue;
}
t->Predicate.ConstructInPlace(true);
return PreProcessImmediateTx(t->ProposeTransaction->Record);
return PreProcessImmediateTx(t->ProposeTransaction->GetRecord());

} else if (t->Tx) { // Distributed TX
if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined.
Expand Down Expand Up @@ -2573,7 +2573,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
void TPartition::ExecImmediateTx(TTransaction& t)
{
--ImmediateTxCount;
auto& record = t.ProposeTransaction->Record;
auto record = t.ProposeTransaction->GetRecord();
Y_ABORT_UNLESS(record.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(record.HasData());

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ struct TTransaction {
: ProposeTransaction(proposeTx)
, State(ECommitState::Committed)
{
if (proposeTx->Record.HasSupportivePartitionActor()) {
SupportivePartitionActor = ActorIdFromProto(proposeTx->Record.GetSupportivePartitionActor());
auto record = proposeTx->GetRecord();
if (record.HasSupportivePartitionActor()) {
SupportivePartitionActor = ActorIdFromProto(record.GetSupportivePartitionActor());
}
Y_ABORT_UNLESS(ProposeTransaction);
}
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,

void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
{
auto& record = ev->Record;
auto record = ev->GetRecord();

int oldConfigVersion = Config.HasVersion() ? static_cast<int>(Config.GetVersion()) : -1;
int newConfigVersion = NewConfig.HasVersion() ? static_cast<int>(NewConfig.GetVersion()) : oldConfigVersion;
Expand Down Expand Up @@ -3259,9 +3259,9 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co

void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.ShortDebugString());
NKikimrPQ::TEvProposeTransaction event = ev->Get()->GetRecord();
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString());

NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
switch (event.GetTxBodyCase()) {
case NKikimrPQ::TEvProposeTransaction::kData:
HandleDataTransaction(ev->Release(), ctx);
Expand Down Expand Up @@ -3316,7 +3316,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
const TActorContext& ctx)
{
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
NKikimrPQ::TEvProposeTransaction event = ev->GetRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
Expand Down Expand Up @@ -3427,7 +3427,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
const TActorContext& ctx)
{
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
NKikimrPQ::TEvProposeTransaction event = ev->GetRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig);
Y_ABORT_UNLESS(event.HasConfig());

Expand Down Expand Up @@ -3705,7 +3705,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
const auto front = std::move(EvProposeTransactionQueue.front());
EvProposeTransactionQueue.pop_front();

const NKikimrPQ::TEvProposeTransaction& event = front->Record;
NKikimrPQ::TEvProposeTransaction event = front->GetRecord();
TDistributedTransaction& tx = Txs[event.GetTxId()];

switch (tx.State) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
try {
runtime.ResetScheduledCount();

THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
for (ui32 i = 0; i < parameters.partitions; ++i) {
request->Record.MutableTabletConfig()->AddPartitionIds(i);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition,
bool immediate,
ui64 txId)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
auto* body = event->Record.MutableData();
Expand Down Expand Up @@ -1606,7 +1606,7 @@ ui64 TPartitionTxTestHelper::MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected)
ui64 TPartitionTxTestHelper::MakeAndSendImmediateTx(const TSrcIdMap& srcIdsAffected) {
auto actIter = AddWriteTxImpl(srcIdsAffected, NextActId++, 0);

auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
auto* body = event->Record.MutableData();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/pqtablet_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ void TPQTabletFixture::SendToPipe(const TActorId& sender,

void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
THashSet<ui32> partitions;

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/user_action_processor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,
bool immediate,
ui64 txId)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
auto* body = event->Record.MutableTxBody();
Expand All @@ -665,7 +665,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,

void TUserActionProcessorFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

//
// Source
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,15 +695,15 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TString>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
event->Record.SetTxId(ui64(txId));
ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor());

Expand All @@ -719,7 +719,7 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
databasePath);
if (bootstrapConfig) {
Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup);
event->Record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
event->PreSerializedData += *bootstrapConfig;
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand All @@ -734,15 +734,15 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TString>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfig>();
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
event->Record.SetTxId(ui64(txId));

MakePQTabletConfig(context,
Expand All @@ -757,7 +757,7 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
databasePath);
if (bootstrapConfig) {
Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup);
event->Record.MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
event->PreSerializedData += *bootstrapConfig;
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand Down
21 changes: 17 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,19 @@ class TConfigureParts: public TSubOperationState {
}
}

std::optional<TString> preSerializedBootstrapConfig;
if (bootstrapConfig) {
if (context.SS->EnablePQConfigTransactionsAtSchemeShard) {
NKikimrPQ::TEvProposeTransaction proto;
proto.MutableConfig()->MutableBootstrapConfig()->Swap(&*bootstrapConfig);
preSerializedBootstrapConfig.emplace(proto.SerializeAsString());
} else {
NKikimrPQ::TUpdateConfig proto;
proto.MutableBootstrapConfig()->Swap(&*bootstrapConfig);
preSerializedBootstrapConfig.emplace(proto.SerializeAsString());
}
}

for (auto shard : txState->Shards) {
TShardIdx idx = shard.Idx;
TTabletId tabletId = context.SS->ShardInfos.at(idx).TabletID;
Expand All @@ -723,7 +736,7 @@ class TConfigureParts: public TSubOperationState {
*pqShard,
topicName,
topicPath.PathString(),
bootstrapConfig,
preSerializedBootstrapConfig,
cloudId,
folderId,
databaseId,
Expand All @@ -736,7 +749,7 @@ class TConfigureParts: public TSubOperationState {
*pqShard,
topicName,
topicPath.PathString(),
bootstrapConfig,
preSerializedBootstrapConfig,
cloudId,
folderId,
databaseId,
Expand Down Expand Up @@ -918,7 +931,7 @@ class TConfigureParts: public TSubOperationState {
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TString>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
Expand All @@ -931,7 +944,7 @@ class TConfigureParts: public TSubOperationState {
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TString>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
Expand Down
Loading