Skip to content

Commit

Permalink
Pre-serialized bootstrap config (#9331)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Sep 17, 2024
1 parent 2d9c88c commit e8c3843
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 32 deletions.
2 changes: 1 addition & 1 deletion ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class TMessageBusServerPersQueueRequestTestBase: public TTestBase {
static int version = 0;
++version;

THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
for (size_t i : partitions) {
request->Record.MutableTabletConfig()->AddPartitionIds(i);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2695,7 +2695,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
for (auto& [tabletId, t] : topicTxs) {
auto& transaction = t.tx;

auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();

if (t.hasWrite && writeId.Defined()) {
auto* w = transaction.MutableWriteId();
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/persqueue/events/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,15 @@ struct TEvPersQueue {
TEvResponse() {}
};

struct TEvUpdateConfig: public TEventPB<TEvUpdateConfig,
struct TEvUpdateConfig: public TEventPreSerializedPB<TEvUpdateConfig,
NKikimrPQ::TUpdateConfig, EvUpdateConfig> {
TEvUpdateConfig() {}
};

struct TEvUpdateConfigBuilder: public TEvUpdateConfig {
using TBase::Record;
};

struct TEvUpdateBalancerConfig: public TEventPB<TEvUpdateBalancerConfig,
NKikimrPQ::TUpdateBalancerConfig, EvUpdateBalancerConfig> {
TEvUpdateBalancerConfig() {}
Expand Down Expand Up @@ -245,7 +249,11 @@ struct TEvPersQueue {
{}
};

struct TEvProposeTransaction : public TEventPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
struct TEvProposeTransaction : public TEventPreSerializedPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
};

struct TEvProposeTransactionBuilder: public TEvProposeTransaction {
using TBase::Record;
};

struct TEvProposeTransactionResult : public TEventPB<TEvProposeTransactionResult, NKikimrPQ::TEvProposeTransactionResult, EvProposeTransactionResult> {
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo

void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
Expand Down Expand Up @@ -1990,7 +1990,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
return EProcessResult::Continue;
}
t->Predicate.ConstructInPlace(true);
return PreProcessImmediateTx(t->ProposeTransaction->Record);
return PreProcessImmediateTx(t->ProposeTransaction->GetRecord());

} else if (t->Tx) { // Distributed TX
if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined.
Expand Down Expand Up @@ -2573,7 +2573,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
void TPartition::ExecImmediateTx(TTransaction& t)
{
--ImmediateTxCount;
auto& record = t.ProposeTransaction->Record;
const auto& record = t.ProposeTransaction->GetRecord();
Y_ABORT_UNLESS(record.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(record.HasData());

Expand All @@ -2586,7 +2586,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
t.Message);
return;
}
for (auto& operation : record.GetData().GetOperations()) {
for (const auto& operation : record.GetData().GetOperations()) {
if (!operation.HasBegin() || !operation.HasEnd() || !operation.HasConsumer()) {
continue; //Write operation - handled separately via WriteInfo
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ struct TTransaction {
: ProposeTransaction(proposeTx)
, State(ECommitState::Committed)
{
if (proposeTx->Record.HasSupportivePartitionActor()) {
SupportivePartitionActor = ActorIdFromProto(proposeTx->Record.GetSupportivePartitionActor());
const auto& record = proposeTx->GetRecord();
if (record.HasSupportivePartitionActor()) {
SupportivePartitionActor = ActorIdFromProto(record.GetSupportivePartitionActor());
}
Y_ABORT_UNLESS(ProposeTransaction);
}
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,

void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
{
auto& record = ev->Record;
const auto& record = ev->GetRecord();

int oldConfigVersion = Config.HasVersion() ? static_cast<int>(Config.GetVersion()) : -1;
int newConfigVersion = NewConfig.HasVersion() ? static_cast<int>(NewConfig.GetVersion()) : oldConfigVersion;
Expand Down Expand Up @@ -3259,9 +3259,9 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co

void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.ShortDebugString());
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString());

NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
switch (event.GetTxBodyCase()) {
case NKikimrPQ::TEvProposeTransaction::kData:
HandleDataTransaction(ev->Release(), ctx);
Expand Down Expand Up @@ -3316,7 +3316,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
const TActorContext& ctx)
{
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
NKikimrPQ::TEvProposeTransaction& event = *ev->MutableRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
Expand Down Expand Up @@ -3427,7 +3427,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
const TActorContext& ctx)
{
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
const NKikimrPQ::TEvProposeTransaction& event = ev->GetRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig);
Y_ABORT_UNLESS(event.HasConfig());

Expand Down Expand Up @@ -3705,7 +3705,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
const auto front = std::move(EvProposeTransactionQueue.front());
EvProposeTransactionQueue.pop_front();

const NKikimrPQ::TEvProposeTransaction& event = front->Record;
const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord();
TDistributedTransaction& tx = Txs[event.GetTxId()];

switch (tx.State) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
try {
runtime.ResetScheduledCount();

THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
for (ui32 i = 0; i < parameters.partitions; ++i) {
request->Record.MutableTabletConfig()->AddPartitionIds(i);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition,
bool immediate,
ui64 txId)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
auto* body = event->Record.MutableData();
Expand Down Expand Up @@ -1606,7 +1606,7 @@ ui64 TPartitionTxTestHelper::MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected)
ui64 TPartitionTxTestHelper::MakeAndSendImmediateTx(const TSrcIdMap& srcIdsAffected) {
auto actIter = AddWriteTxImpl(srcIdsAffected, NextActId++, 0);

auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
auto* body = event->Record.MutableData();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/pqtablet_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ void TPQTabletFixture::SendToPipe(const TActorId& sender,

void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
THashSet<ui32> partitions;

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/user_action_processor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,
bool immediate,
ui64 txId)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
auto* body = event->Record.MutableTxBody();
Expand All @@ -665,7 +665,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,

void TUserActionProcessorFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

//
// Source
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,15 +695,15 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
event->Record.SetTxId(ui64(txId));
ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor());

Expand All @@ -719,7 +719,7 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
databasePath);
if (bootstrapConfig) {
Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup);
event->Record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
event->PreSerializedData += bootstrapConfig->GetPreSerializedProposeTransaction();
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand All @@ -734,15 +734,15 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfig>();
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
event->Record.SetTxId(ui64(txId));

MakePQTabletConfig(context,
Expand All @@ -757,7 +757,7 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
databasePath);
if (bootstrapConfig) {
Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup);
event->Record.MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
event->PreSerializedData += bootstrapConfig->GetPreSerializedUpdateConfig();
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand Down
55 changes: 51 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,54 @@ class TDone: public TSubOperationState {

namespace NPQState {

class TBootstrapConfigWrapper: public NKikimrPQ::TBootstrapConfig {
struct TSerializedProposeTransaction {
TString Value;

static TSerializedProposeTransaction Serialize(const NKikimrPQ::TBootstrapConfig& value) {
NKikimrPQ::TEvProposeTransaction record;
record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(value);
return {record.SerializeAsString()};
}
};

struct TSerializedUpdateConfig {
TString Value;

static TSerializedUpdateConfig Serialize(const NKikimrPQ::TBootstrapConfig& value) {
NKikimrPQ::TUpdateConfig record;
record.MutableBootstrapConfig()->CopyFrom(value);
return {record.SerializeAsString()};
}
};

mutable std::optional<std::variant<
TSerializedProposeTransaction,
TSerializedUpdateConfig
>> PreSerialized;

template <typename T>
const TString& Get() const {
if (!PreSerialized) {
PreSerialized.emplace(T::Serialize(*this));
}

const auto* value = std::get_if<T>(&PreSerialized.value());
Y_ABORT_UNLESS(value);

return value->Value;
}

public:
const TString& GetPreSerializedProposeTransaction() const {
return Get<TSerializedProposeTransaction>();
}

const TString& GetPreSerializedUpdateConfig() const {
return Get<TSerializedUpdateConfig>();
}
};

class TConfigureParts: public TSubOperationState {
private:
TOperationId OperationId;
Expand Down Expand Up @@ -627,7 +675,6 @@ class TConfigureParts: public TSubOperationState {
return false;
}


bool ProgressState(TOperationContext& context) override {
TTabletId ssId = context.SS->SelfTabletId();

Expand Down Expand Up @@ -669,7 +716,7 @@ class TConfigureParts: public TSubOperationState {
TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString();
auto topicPath = TPath::Init(txState->TargetPathId, context.SS);

std::optional<NKikimrPQ::TBootstrapConfig> bootstrapConfig;
std::optional<TBootstrapConfigWrapper> bootstrapConfig;
if (txState->TxType == TTxState::TxCreatePQGroup && topicPath.Parent().IsCdcStream()) {
bootstrapConfig.emplace();

Expand Down Expand Up @@ -918,7 +965,7 @@ class TConfigureParts: public TSubOperationState {
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
Expand All @@ -931,7 +978,7 @@ class TConfigureParts: public TSubOperationState {
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
Expand Down

0 comments on commit e8c3843

Please sign in to comment.