diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index d68a72f9b659..8f7f902b3ecc 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -1,6 +1,7 @@ #include "tx_write.h" namespace NKikimr::NColumnShard { + bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) { NKikimrTxColumnShard::TLogicalMetadata meta; meta.SetNumRows(batch->GetRowsCount()); @@ -28,7 +29,6 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali return false; } - bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute"); ACFL_DEBUG("event", "start_execute"); @@ -76,19 +76,12 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { } for (auto&& aggr : buffer.GetAggregations()) { const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta(); - std::unique_ptr result; - TWriteOperation::TPtr operation; if (!writeMeta.HasLongTxId()) { - operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); + auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); Y_ABORT_UNLESS(operation); Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); - } - if (operation) { operation->OnWriteFinish(txc, aggr->GetWriteIds()); - auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); - Y_UNUSED(txInfo); - NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId()); - Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo)); + ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetTxId()), "", writeMeta.GetSource(), 0, txc); } else { Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1); Results.emplace_back(std::make_unique(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS)); @@ -97,6 +90,15 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { return true; } +void TTxWrite::OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) { + Y_UNUSED(proposeResult); + Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), txInfo.TxId, Self->GetProgressTxController().BuildCoordinatorInfo(txInfo))); +} + +void TTxWrite::OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) { + AFL_VERIFY("Unexpected behaviour")("tx_id", txInfo.TxId)("details", proposeResult.DebugString()); +} + void TTxWrite::Complete(const TActorContext& ctx) { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); const auto now = TMonotonic::Now(); diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h index 6086542940f6..4af16dc94cee 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h @@ -1,13 +1,14 @@ #pragma once #include +#include #include namespace NKikimr::NColumnShard { -class TTxWrite : public TTransactionBase { +class TTxWrite : public TProposeTransactionBase { public: TTxWrite(TColumnShard* self, const TEvPrivate::TEvWriteBlobsResult::TPtr& putBlobResult) - : TBase(self) + : TProposeTransactionBase(self) , PutBlobResult(putBlobResult) , TabletTxNo(++Self->TabletTxCounter) {} @@ -16,13 +17,17 @@ class TTxWrite : public TTransactionBase { void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_WRITE; } - bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId); private: TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult; const ui32 TabletTxNo; std::vector> Results; + + bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId); + void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) override; + void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) override; + TStringBuilder TxPrefix() const { return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] "; } diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 4f04f60ff8d1..89393b15c579 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -3,17 +3,17 @@ #include "columnshard_schema.h" #include #include +#include namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; -class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase { +class TTxProposeTransaction : public TProposeTransactionBase { public: TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev) - : TBase(self) + : TProposeTransactionBase(self) , Ev(ev) - , TabletTxNo(++Self->TabletTxCounter) {} bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; @@ -22,25 +22,16 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase Result; - TStringBuilder TxPrefix() const { - return TStringBuilder() << "TxProposeTransaction[" << ToString(TabletTxNo) << "] "; - } - - TString TxSuffix() const { - return TStringBuilder() << " at tablet " << Self->TabletID(); - } - - void ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo); + void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) override; + void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) override; TTxController::TProposeResult ProposeTtlDeprecated(const TString& txBody); }; bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) { Y_ABORT_UNLESS(Ev); - LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix()); txc.DB.NoMoreReadsForTx(); NIceDb::TNiceDb db(txc.DB); @@ -48,9 +39,9 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex Self->IncCounter(COUNTER_PREPARE_REQUEST); auto& record = Proto(Ev->Get()); - auto txKind = record.GetTxKind(); - ui64 txId = record.GetTxId(); - auto& txBody = record.GetTxBody(); + const auto txKind = record.GetTxKind(); + const ui64 txId = record.GetTxId(); + const auto& txBody = record.GetTxBody(); if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) { auto proposeResult = ProposeTtlDeprecated(txBody); @@ -71,39 +62,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId()); } } - - TTxController::TBasicTxInfo fakeTxInfo; - fakeTxInfo.TxId = txId; - fakeTxInfo.TxKind = txKind; - - auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txKind, fakeTxInfo); - if (!txOperator || !txOperator->Parse(txBody)) { - TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txId - << (txOperator ? ". Parsing error " : ". Unknown operator for txKind")); - ConstructResult(proposeResult, fakeTxInfo); - return true; - } - - auto txInfoPtr = Self->ProgressTxController->GetTxInfo(txId); - if (!!txInfoPtr) { - if (txInfoPtr->Source != Ev->Get()->GetSource() || txInfoPtr->Cookie != Ev->Cookie) { - TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txId << " has already been proposed"); - ConstructResult(proposeResult, fakeTxInfo); - } - TTxController::TProposeResult proposeResult; - ConstructResult(proposeResult, *txInfoPtr); - } else { - auto proposeResult = txOperator->Propose(*Self, txc, false); - if (!!proposeResult) { - const auto& txInfo = txOperator->TxWithDeadline() ? Self->ProgressTxController->RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc) - : Self->ProgressTxController->RegisterTx(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); - - ConstructResult(proposeResult, txInfo); - } else { - ConstructResult(proposeResult, fakeTxInfo); - } - } - AFL_VERIFY(!!Result); + ProposeTransaction(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); return true; } @@ -154,21 +113,21 @@ TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const return TTxController::TProposeResult(); } -void TTxProposeTransaction::ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) { +void TTxProposeTransaction::OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) { Result = std::make_unique(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage()); - if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED) { - Self->IncCounter(COUNTER_PREPARE_SUCCESS); - Result->Record.SetMinStep(txInfo.MinStep); - Result->Record.SetMaxStep(txInfo.MaxStep); - if (Self->ProcessingParams) { - Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators()); - } - } else if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) { - Self->IncCounter(COUNTER_PREPARE_SUCCESS); - } else { - Self->IncCounter(COUNTER_PREPARE_ERROR); - LOG_S_INFO(TxPrefix() << "error txId " << txInfo.TxId << " " << proposeResult.GetStatusMessage() << TxSuffix()); + Self->IncCounter(COUNTER_PREPARE_ERROR); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", proposeResult.GetStatusMessage())("tablet_id", Self->TabletID())("tx_id", txInfo.TxId); +} + +void TTxProposeTransaction::OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) { + AFL_VERIFY(proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED)("tx_id", txInfo.TxId)("details", proposeResult.DebugString()); + Result = std::make_unique(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage()); + Result->Record.SetMinStep(txInfo.MinStep); + Result->Record.SetMaxStep(txInfo.MaxStep); + if (Self->ProcessingParams) { + Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators()); } + Self->IncCounter(COUNTER_PREPARE_SUCCESS); } void TTxProposeTransaction::Complete(const TActorContext& ctx) { @@ -180,12 +139,6 @@ void TTxProposeTransaction::Complete(const TActorContext& ctx) { void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - auto& record = Proto(ev->Get()); - auto txKind = record.GetTxKind(); - ui64 txId = record.GetTxId(); - LOG_S_DEBUG("ProposeTransaction " << NKikimrTxColumnShard::ETransactionKind_Name(txKind) - << " txId " << txId << " at tablet " << TabletID()); - Execute(new TTxProposeTransaction(this, ev), ctx); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 0b0a7a779228..390487176e03 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -570,6 +570,11 @@ class TColumnShard return TablesManager.MutablePrimaryIndexAsVerified(); } + TTxController& GetProgressTxController() const { + AFL_VERIFY(ProgressTxController); + return *ProgressTxController; + } + bool HasIndex() const { return !!TablesManager.GetPrimaryIndex(); } diff --git a/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp b/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp new file mode 100644 index 000000000000..ef50e2ae5959 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp @@ -0,0 +1,37 @@ +#include "propose_transaction_base.h" + +#include + + +namespace NKikimr::NColumnShard { + + void TProposeTransactionBase::ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc) { + auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txInfo.TxKind, TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId)); + if (!txOperator || !txOperator->Parse(txBody)) { + TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txInfo.TxId + << (txOperator ? ". Parsing error " : ". Unknown operator for txKind")); + OnProposeError(proposeResult, txInfo); + return; + } + + auto txInfoPtr = Self->GetProgressTxController().GetTxInfo(txInfo.TxId); + if (!!txInfoPtr) { + if (txInfoPtr->Source != source || txInfoPtr->Cookie != cookie) { + TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txInfo.TxId << " has already been proposed"); + OnProposeError(proposeResult, txInfo); + } + TTxController::TProposeResult proposeResult; + OnProposeResult(proposeResult, *txInfoPtr); + } else { + auto proposeResult = txOperator->Propose(*Self, txc, false); + if (!!proposeResult) { + const auto fullTxInfo = txOperator->TxWithDeadline() ? Self->GetProgressTxController().RegisterTxWithDeadline(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc) + : Self->GetProgressTxController().RegisterTx(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc); + + OnProposeResult(proposeResult, fullTxInfo); + } else { + OnProposeError(proposeResult, txInfo); + } + } + } +} diff --git a/ydb/core/tx/columnshard/transactions/propose_transaction_base.h b/ydb/core/tx/columnshard/transactions/propose_transaction_base.h new file mode 100644 index 000000000000..7657c4aff475 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/propose_transaction_base.h @@ -0,0 +1,22 @@ +#pragma once +#include "tx_controller.h" + +namespace NKikimr::NColumnShard { + +class TColumnShard; + +class TProposeTransactionBase : public NTabletFlatExecutor::TTransactionBase { +public: + TProposeTransactionBase(TColumnShard* self) + : TBase(self) + {} + +protected: + void ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc); + + virtual void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) = 0; + virtual void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) = 0; +}; + + +} diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp index 1137456d3d69..6b90247c0957 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.cpp +++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp @@ -23,7 +23,7 @@ ui64 TTxController::GetAllowedStep() const { } ui64 TTxController::GetMemoryUsage() const { - return BasicTxInfo.size() * sizeof(TBasicTxInfo) + + return BasicTxInfo.size() * sizeof(TTxInfo) + DeadlineQueue.size() * sizeof(TPlanQueueItem) + (PlanQueue.size() + RunningQueue.size()) * sizeof(TPlanQueueItem); } @@ -45,9 +45,11 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { return false; while (!rowset.EndOfSet()) { - ui64 txId = rowset.GetValue(); - auto& txInfo = BasicTxInfo[txId]; - txInfo.TxId = txId; + const ui64 txId = rowset.GetValue(); + const NKikimrTxColumnShard::ETransactionKind txKind = rowset.GetValue(); + + auto txInfoIt = BasicTxInfo.emplace(txId, TTxInfo(txKind, txId)).first; + auto& txInfo = txInfoIt->second; txInfo.MaxStep = rowset.GetValue(); if (txInfo.MaxStep != Max()) { txInfo.MinStep = txInfo.MaxStep - MaxCommitTxDelay.MilliSeconds(); @@ -55,7 +57,6 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { txInfo.PlanStep = rowset.GetValueOrDefault(0); txInfo.Source = rowset.GetValue(); txInfo.Cookie = rowset.GetValue(); - txInfo.TxKind = rowset.GetValue(); if (txInfo.PlanStep != 0) { PlanQueue.emplace(txInfo.PlanStep, txInfo.TxId); @@ -90,12 +91,11 @@ TTxController::ITransactionOperatior::TPtr TTxController::GetVerifiedTxOperator( return it->second; } -const TTxController::TBasicTxInfo& TTxController::RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { +TTxController::TTxInfo TTxController::RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); - auto& txInfo = BasicTxInfo[txId]; - txInfo.TxId = txId; - txInfo.TxKind = txKind; + auto txInfoIt = BasicTxInfo.emplace(txId, TTxInfo(txKind, txId)).first; + auto& txInfo = txInfoIt->second; txInfo.Source = source; txInfo.Cookie = cookie; @@ -108,12 +108,11 @@ const TTxController::TBasicTxInfo& TTxController::RegisterTx(const ui64 txId, co return txInfo; } -const TTxController::TBasicTxInfo& TTxController::RegisterTxWithDeadline(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { +TTxController::TTxInfo TTxController::RegisterTxWithDeadline(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); - auto& txInfo = BasicTxInfo[txId]; - txInfo.TxId = txId; - txInfo.TxKind = txKind; + auto txInfoIt = BasicTxInfo.emplace(txId, TTxInfo(txKind, txId)).first; + auto& txInfo = txInfoIt->second; txInfo.Source = source; txInfo.Cookie = cookie; txInfo.MinStep = GetAllowedStep(); @@ -174,7 +173,7 @@ bool TTxController::CancelTx(const ui64 txId, NTabletFlatExecutor::TTransactionC return true; } -std::optional TTxController::StartPlannedTx() { +std::optional TTxController::StartPlannedTx() { if (!PlanQueue.empty()) { auto node = PlanQueue.extract(PlanQueue.begin()); auto& item = node.value(); @@ -206,17 +205,19 @@ std::optional TTxController::GetPlannedTx() const return *PlanQueue.begin(); } -const TTxController::TBasicTxInfo* TTxController::GetTxInfo(const ui64 txId) const { - return BasicTxInfo.FindPtr(txId); +std::optional TTxController::GetTxInfo(const ui64 txId) const { + auto txPtr = BasicTxInfo.FindPtr(txId); + if (txPtr) { + return *txPtr; + } + return std::nullopt; } -NEvents::TDataEvents::TCoordinatorInfo TTxController::GetCoordinatorInfo(const ui64 txId) const { - auto txInfo = BasicTxInfo.FindPtr(txId); - Y_ABORT_UNLESS(txInfo); +NEvents::TDataEvents::TCoordinatorInfo TTxController::BuildCoordinatorInfo(const TTxInfo& txInfo) const { if (Owner.ProcessingParams) { - return NEvents::TDataEvents::TCoordinatorInfo(txInfo->MinStep, txInfo->MaxStep, Owner.ProcessingParams->GetCoordinators()); + return NEvents::TDataEvents::TCoordinatorInfo(txInfo.MinStep, txInfo.MaxStep, Owner.ProcessingParams->GetCoordinators()); } - return NEvents::TDataEvents::TCoordinatorInfo(txInfo->MinStep, txInfo->MaxStep, {}); + return NEvents::TDataEvents::TCoordinatorInfo(txInfo.MinStep, txInfo.MaxStep, {}); } size_t TTxController::CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext& txc) { diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h index fb38fae9d32e..1013f128a239 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.h +++ b/ydb/core/tx/columnshard/transactions/tx_controller.h @@ -27,13 +27,25 @@ class TTxController { }; struct TBasicTxInfo { - ui64 TxId; + const NKikimrTxColumnShard::ETransactionKind TxKind; + const ui64 TxId; + public: + TBasicTxInfo(const NKikimrTxColumnShard::ETransactionKind& txKind, const ui64 txId) + : TxKind(txKind) + , TxId(txId) + {} + }; + + struct TTxInfo : public TBasicTxInfo { ui64 MaxStep = Max(); ui64 MinStep = 0; ui64 PlanStep = 0; TActorId Source; ui64 Cookie = 0; - NKikimrTxColumnShard::ETransactionKind TxKind; + public: + TTxInfo(const NKikimrTxColumnShard::ETransactionKind& txKind, const ui64 txId) + : TBasicTxInfo(txKind, txId) + {} }; class TProposeResult { @@ -47,18 +59,22 @@ class TTxController { {} bool operator!() const { - return Status != NKikimrTxColumnShard::EResultStatus::PREPARED; + return Status != NKikimrTxColumnShard::EResultStatus::PREPARED && Status != NKikimrTxColumnShard::EResultStatus::SUCCESS; + } + + TString DebugString() const { + return TStringBuilder() << "status=" << (ui64) Status << ";message=" << StatusMessage; } }; class ITransactionOperatior { protected: - TBasicTxInfo TxInfo; + TTxInfo TxInfo; public: using TPtr = std::shared_ptr; - using TFactory = NObjectFactory::TParametrizedObjectFactory; + using TFactory = NObjectFactory::TParametrizedObjectFactory; - ITransactionOperatior(const TBasicTxInfo& txInfo) + ITransactionOperatior(const TTxInfo& txInfo) : TxInfo(txInfo) {} @@ -87,7 +103,7 @@ class TTxController { private: const TDuration MaxCommitTxDelay = TDuration::Seconds(30); TColumnShard& Owner; - THashMap BasicTxInfo; + THashMap BasicTxInfo; std::set DeadlineQueue; std::set PlanQueue; std::set RunningQueue; @@ -109,19 +125,19 @@ class TTxController { bool Load(NTabletFlatExecutor::TTransactionContext& txc); - const TBasicTxInfo& RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); - const TBasicTxInfo& RegisterTxWithDeadline(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); + TTxInfo RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); + TTxInfo RegisterTxWithDeadline(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); bool CancelTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); - std::optional StartPlannedTx(); + std::optional StartPlannedTx(); void FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); void CompleteRunningTx(const TPlanQueueItem& tx); std::optional GetPlannedTx() const; TPlanQueueItem GetFrontTx() const; - const TBasicTxInfo* GetTxInfo(const ui64 txId) const; - NEvents::TDataEvents::TCoordinatorInfo GetCoordinatorInfo(const ui64 txId) const; + std::optional GetTxInfo(const ui64 txId) const; + NEvents::TDataEvents::TCoordinatorInfo BuildCoordinatorInfo(const TTxInfo& txInfo) const; size_t CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext& txc); diff --git a/ydb/core/tx/columnshard/transactions/ya.make b/ydb/core/tx/columnshard/transactions/ya.make index 94f14e7492b5..3b3a98aca4c9 100644 --- a/ydb/core/tx/columnshard/transactions/ya.make +++ b/ydb/core/tx/columnshard/transactions/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( tx_controller.cpp locks_db.cpp + propose_transaction_base.cpp ) PEERDIR(