Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24-3: Pre-serialized bootstrap config #9342

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class TMessageBusServerPersQueueRequestTestBase: public TTestBase {
static int version = 0;
++version;

THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
for (size_t i : partitions) {
request->Record.MutableTabletConfig()->AddPartitionIds(i);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2595,7 +2595,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
for (auto& [tabletId, t] : topicTxs) {
auto& transaction = t.tx;

auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();

if (t.hasWrite && writeId.Defined()) {
auto* w = transaction.MutableWriteId();
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/persqueue/events/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,15 @@ struct TEvPersQueue {
TEvResponse() {}
};

struct TEvUpdateConfig: public TEventPB<TEvUpdateConfig,
struct TEvUpdateConfig: public TEventPreSerializedPB<TEvUpdateConfig,
NKikimrPQ::TUpdateConfig, EvUpdateConfig> {
TEvUpdateConfig() {}
};

struct TEvUpdateConfigBuilder: public TEvUpdateConfig {
using TBase::Record;
};

struct TEvUpdateBalancerConfig: public TEventPB<TEvUpdateBalancerConfig,
NKikimrPQ::TUpdateBalancerConfig, EvUpdateBalancerConfig> {
TEvUpdateBalancerConfig() {}
Expand Down Expand Up @@ -245,7 +249,11 @@ struct TEvPersQueue {
{}
};

struct TEvProposeTransaction : public TEventPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
struct TEvProposeTransaction : public TEventPreSerializedPB<TEvProposeTransaction, NKikimrPQ::TEvProposeTransaction, EvProposeTransaction> {
};

struct TEvProposeTransactionBuilder: public TEvProposeTransaction {
using TBase::Record;
};

struct TEvProposeTransactionResult : public TEventPB<TEvProposeTransactionResult, NKikimrPQ::TEvProposeTransactionResult, EvProposeTransactionResult> {
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo

void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
Expand Down Expand Up @@ -1990,7 +1990,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
return EProcessResult::Continue;
}
t->Predicate.ConstructInPlace(true);
return PreProcessImmediateTx(t->ProposeTransaction->Record);
return PreProcessImmediateTx(t->ProposeTransaction->GetRecord());

} else if (t->Tx) { // Distributed TX
if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined.
Expand Down Expand Up @@ -2573,7 +2573,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
void TPartition::ExecImmediateTx(TTransaction& t)
{
--ImmediateTxCount;
auto& record = t.ProposeTransaction->Record;
const auto& record = t.ProposeTransaction->GetRecord();
Y_ABORT_UNLESS(record.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(record.HasData());

Expand All @@ -2586,7 +2586,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
t.Message);
return;
}
for (auto& operation : record.GetData().GetOperations()) {
for (const auto& operation : record.GetData().GetOperations()) {
if (!operation.HasBegin() || !operation.HasEnd() || !operation.HasConsumer()) {
continue; //Write operation - handled separately via WriteInfo
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ struct TTransaction {
: ProposeTransaction(proposeTx)
, State(ECommitState::Committed)
{
if (proposeTx->Record.HasSupportivePartitionActor()) {
SupportivePartitionActor = ActorIdFromProto(proposeTx->Record.GetSupportivePartitionActor());
const auto& record = proposeTx->GetRecord();
if (record.HasSupportivePartitionActor()) {
SupportivePartitionActor = ActorIdFromProto(record.GetSupportivePartitionActor());
}
Y_ABORT_UNLESS(ProposeTransaction);
}
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1631,7 +1631,7 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,

void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
{
auto& record = ev->Record;
const auto& record = ev->GetRecord();

int oldConfigVersion = Config.HasVersion() ? Config.GetVersion() : -1;
int newConfigVersion = NewConfig.HasVersion() ? NewConfig.GetVersion() : oldConfigVersion;
Expand Down Expand Up @@ -3266,9 +3266,9 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co

void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.ShortDebugString());
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString());

NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
switch (event.GetTxBodyCase()) {
case NKikimrPQ::TEvProposeTransaction::kData:
HandleDataTransaction(ev->Release(), ctx);
Expand Down Expand Up @@ -3323,7 +3323,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
const TActorContext& ctx)
{
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
NKikimrPQ::TEvProposeTransaction& event = *ev->MutableRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
Expand Down Expand Up @@ -3434,7 +3434,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
const TActorContext& ctx)
{
NKikimrPQ::TEvProposeTransaction& event = ev->Record;
const NKikimrPQ::TEvProposeTransaction& event = ev->GetRecord();
Y_ABORT_UNLESS(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig);
Y_ABORT_UNLESS(event.HasConfig());

Expand Down Expand Up @@ -3712,7 +3712,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
const auto front = std::move(EvProposeTransactionQueue.front());
EvProposeTransactionQueue.pop_front();

const NKikimrPQ::TEvProposeTransaction& event = front->Record;
const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord();
TDistributedTransaction& tx = Txs[event.GetTxId()];

switch (tx.State) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
try {
runtime.ResetScheduledCount();

THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
auto request = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
for (ui32 i = 0; i < parameters.partitions; ++i) {
request->Record.MutableTabletConfig()->AddPartitionIds(i);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition,
bool immediate,
ui64 txId)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
auto* body = event->Record.MutableData();
Expand Down Expand Up @@ -1606,7 +1606,7 @@ ui64 TPartitionTxTestHelper::MakeAndSendWriteTx(const TSrcIdMap& srcIdsAffected)
ui64 TPartitionTxTestHelper::MakeAndSendImmediateTx(const TSrcIdMap& srcIdsAffected) {
auto actIter = AddWriteTxImpl(srcIdsAffected, NextActId++, 0);

auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
auto* body = event->Record.MutableData();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/pqtablet_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ void TPQTabletFixture::SendToPipe(const TActorId& sender,

void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
THashSet<ui32> partitions;

ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/user_action_processor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,
bool immediate,
ui64 txId)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
auto* body = event->Record.MutableTxBody();
Expand All @@ -665,7 +665,7 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(ui32 partition,

void TUserActionProcessorFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();

//
// Source
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,15 +695,15 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
auto event = MakeHolder<TEvPersQueue::TEvProposeTransactionBuilder>();
event->Record.SetTxId(ui64(txId));
ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor());

Expand All @@ -719,7 +719,7 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
databasePath);
if (bootstrapConfig) {
Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup);
event->Record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
event->PreSerializedData += bootstrapConfig->GetPreSerializedProposeTransaction();
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand All @@ -734,15 +734,15 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfig>();
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfigBuilder>();
event->Record.SetTxId(ui64(txId));

MakePQTabletConfig(context,
Expand All @@ -757,7 +757,7 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
databasePath);
if (bootstrapConfig) {
Y_ABORT_UNLESS(txType == TTxState::TxCreatePQGroup);
event->Record.MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
event->PreSerializedData += bootstrapConfig->GetPreSerializedUpdateConfig();
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand Down
55 changes: 51 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,54 @@ class TDone: public TSubOperationState {

namespace NPQState {

class TBootstrapConfigWrapper: public NKikimrPQ::TBootstrapConfig {
struct TSerializedProposeTransaction {
TString Value;

static TSerializedProposeTransaction Serialize(const NKikimrPQ::TBootstrapConfig& value) {
NKikimrPQ::TEvProposeTransaction record;
record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(value);
return {record.SerializeAsString()};
}
};

struct TSerializedUpdateConfig {
TString Value;

static TSerializedUpdateConfig Serialize(const NKikimrPQ::TBootstrapConfig& value) {
NKikimrPQ::TUpdateConfig record;
record.MutableBootstrapConfig()->CopyFrom(value);
return {record.SerializeAsString()};
}
};

mutable std::optional<std::variant<
TSerializedProposeTransaction,
TSerializedUpdateConfig
>> PreSerialized;

template <typename T>
const TString& Get() const {
if (!PreSerialized) {
PreSerialized.emplace(T::Serialize(*this));
}

const auto* value = std::get_if<T>(&PreSerialized.value());
Y_ABORT_UNLESS(value);

return value->Value;
}

public:
const TString& GetPreSerializedProposeTransaction() const {
return Get<TSerializedProposeTransaction>();
}

const TString& GetPreSerializedUpdateConfig() const {
return Get<TSerializedUpdateConfig>();
}
};

class TConfigureParts: public TSubOperationState {
private:
TOperationId OperationId;
Expand Down Expand Up @@ -627,7 +675,6 @@ class TConfigureParts: public TSubOperationState {
return false;
}


bool ProgressState(TOperationContext& context) override {
TTabletId ssId = context.SS->SelfTabletId();

Expand Down Expand Up @@ -669,7 +716,7 @@ class TConfigureParts: public TSubOperationState {
TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString();
auto topicPath = TPath::Init(txState->TargetPathId, context.SS);

std::optional<NKikimrPQ::TBootstrapConfig> bootstrapConfig;
std::optional<TBootstrapConfigWrapper> bootstrapConfig;
if (txState->TxType == TTxState::TxCreatePQGroup && topicPath.Parent().IsCdcStream()) {
bootstrapConfig.emplace();

Expand Down Expand Up @@ -918,7 +965,7 @@ class TConfigureParts: public TSubOperationState {
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
Expand All @@ -931,7 +978,7 @@ class TConfigureParts: public TSubOperationState {
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const std::optional<TBootstrapConfigWrapper>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
Expand Down
Loading