Skip to content

Commit

Permalink
Base transaction class for all propose operations (ydb-platform#2208)
Browse files Browse the repository at this point in the history
Co-authored-by: nsofya <nsofya@yandex.ru>
  • Loading branch information
nsofya and nsofya authored Feb 24, 2024
1 parent 0dd4395 commit f5dc352
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 115 deletions.
22 changes: 12 additions & 10 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "tx_write.h"

namespace NKikimr::NColumnShard {

bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(batch->GetRowsCount());
Expand Down Expand Up @@ -28,7 +29,6 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
return false;
}


bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
Expand Down Expand Up @@ -76,19 +76,12 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
}
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
std::unique_ptr<TEvColumnShard::TEvWriteResult> result;
TWriteOperation::TPtr operation;
if (!writeMeta.HasLongTxId()) {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}
if (operation) {
operation->OnWriteFinish(txc, aggr->GetWriteIds());
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
Y_UNUSED(txInfo);
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo));
ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetTxId()), "", writeMeta.GetSource(), 0, txc);
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
Results.emplace_back(std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS));
Expand All @@ -97,6 +90,15 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
return true;
}

void TTxWrite::OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) {
Y_UNUSED(proposeResult);
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), txInfo.TxId, Self->GetProgressTxController().BuildCoordinatorInfo(txInfo)));
}

void TTxWrite::OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) {
AFL_VERIFY("Unexpected behaviour")("tx_id", txInfo.TxId)("details", proposeResult.DebugString());
}

void TTxWrite::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
const auto now = TMonotonic::Now();
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#pragma once
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/transactions/propose_transaction_base.h>
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>

namespace NKikimr::NColumnShard {

class TTxWrite : public TTransactionBase<TColumnShard> {
class TTxWrite : public TProposeTransactionBase {
public:
TTxWrite(TColumnShard* self, const TEvPrivate::TEvWriteBlobsResult::TPtr& putBlobResult)
: TBase(self)
: TProposeTransactionBase(self)
, PutBlobResult(putBlobResult)
, TabletTxNo(++Self->TabletTxCounter)
{}
Expand All @@ -16,13 +17,17 @@ class TTxWrite : public TTransactionBase<TColumnShard> {
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE; }

bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);

private:
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
const ui32 TabletTxNo;
std::vector<std::unique_ptr<NActors::IEventBase>> Results;


bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) override;
void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) override;

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
}
Expand Down
91 changes: 22 additions & 69 deletions ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
#include "columnshard_schema.h"
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/core/tx/columnshard/transactions/propose_transaction_base.h>

namespace NKikimr::NColumnShard {

using namespace NTabletFlatExecutor;

class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
class TTxProposeTransaction : public TProposeTransactionBase {
public:
TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev)
: TBase(self)
: TProposeTransactionBase(self)
, Ev(ev)
, TabletTxNo(++Self->TabletTxCounter)
{}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
Expand All @@ -22,35 +22,26 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu

private:
TEvColumnShard::TEvProposeTransaction::TPtr Ev;
const ui32 TabletTxNo;
std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> Result;

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxProposeTransaction[" << ToString(TabletTxNo) << "] ";
}

TString TxSuffix() const {
return TStringBuilder() << " at tablet " << Self->TabletID();
}

void ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo);
void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) override;
void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) override;
TTxController::TProposeResult ProposeTtlDeprecated(const TString& txBody);
};


bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) {
Y_ABORT_UNLESS(Ev);
LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix());

txc.DB.NoMoreReadsForTx();
NIceDb::TNiceDb db(txc.DB);

Self->IncCounter(COUNTER_PREPARE_REQUEST);

auto& record = Proto(Ev->Get());
auto txKind = record.GetTxKind();
ui64 txId = record.GetTxId();
auto& txBody = record.GetTxBody();
const auto txKind = record.GetTxKind();
const ui64 txId = record.GetTxId();
const auto& txBody = record.GetTxBody();

if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) {
auto proposeResult = ProposeTtlDeprecated(txBody);
Expand All @@ -71,39 +62,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId());
}
}

TTxController::TBasicTxInfo fakeTxInfo;
fakeTxInfo.TxId = txId;
fakeTxInfo.TxKind = txKind;

auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txKind, fakeTxInfo);
if (!txOperator || !txOperator->Parse(txBody)) {
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txId
<< (txOperator ? ". Parsing error " : ". Unknown operator for txKind"));
ConstructResult(proposeResult, fakeTxInfo);
return true;
}

auto txInfoPtr = Self->ProgressTxController->GetTxInfo(txId);
if (!!txInfoPtr) {
if (txInfoPtr->Source != Ev->Get()->GetSource() || txInfoPtr->Cookie != Ev->Cookie) {
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txId << " has already been proposed");
ConstructResult(proposeResult, fakeTxInfo);
}
TTxController::TProposeResult proposeResult;
ConstructResult(proposeResult, *txInfoPtr);
} else {
auto proposeResult = txOperator->Propose(*Self, txc, false);
if (!!proposeResult) {
const auto& txInfo = txOperator->TxWithDeadline() ? Self->ProgressTxController->RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc)
: Self->ProgressTxController->RegisterTx(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);

ConstructResult(proposeResult, txInfo);
} else {
ConstructResult(proposeResult, fakeTxInfo);
}
}
AFL_VERIFY(!!Result);
ProposeTransaction(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
return true;
}

Expand Down Expand Up @@ -154,21 +113,21 @@ TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const
return TTxController::TProposeResult();
}

void TTxProposeTransaction::ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) {
void TTxProposeTransaction::OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) {
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED) {
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
Result->Record.SetMinStep(txInfo.MinStep);
Result->Record.SetMaxStep(txInfo.MaxStep);
if (Self->ProcessingParams) {
Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators());
}
} else if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
} else {
Self->IncCounter(COUNTER_PREPARE_ERROR);
LOG_S_INFO(TxPrefix() << "error txId " << txInfo.TxId << " " << proposeResult.GetStatusMessage() << TxSuffix());
Self->IncCounter(COUNTER_PREPARE_ERROR);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", proposeResult.GetStatusMessage())("tablet_id", Self->TabletID())("tx_id", txInfo.TxId);
}

void TTxProposeTransaction::OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) {
AFL_VERIFY(proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED)("tx_id", txInfo.TxId)("details", proposeResult.DebugString());
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
Result->Record.SetMinStep(txInfo.MinStep);
Result->Record.SetMaxStep(txInfo.MaxStep);
if (Self->ProcessingParams) {
Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators());
}
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
}

void TTxProposeTransaction::Complete(const TActorContext& ctx) {
Expand All @@ -180,12 +139,6 @@ void TTxProposeTransaction::Complete(const TActorContext& ctx) {


void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
auto& record = Proto(ev->Get());
auto txKind = record.GetTxKind();
ui64 txId = record.GetTxId();
LOG_S_DEBUG("ProposeTransaction " << NKikimrTxColumnShard::ETransactionKind_Name(txKind)
<< " txId " << txId << " at tablet " << TabletID());

Execute(new TTxProposeTransaction(this, ev), ctx);
}

Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,11 @@ class TColumnShard
return TablesManager.MutablePrimaryIndexAsVerified<T>();
}

TTxController& GetProgressTxController() const {
AFL_VERIFY(ProgressTxController);
return *ProgressTxController;
}

bool HasIndex() const {
return !!TablesManager.GetPrimaryIndex();
}
Expand Down
37 changes: 37 additions & 0 deletions ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include "propose_transaction_base.h"

#include <ydb/core/tx/columnshard/columnshard_impl.h>


namespace NKikimr::NColumnShard {

void TProposeTransactionBase::ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc) {
auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txInfo.TxKind, TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId));
if (!txOperator || !txOperator->Parse(txBody)) {
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txInfo.TxId
<< (txOperator ? ". Parsing error " : ". Unknown operator for txKind"));
OnProposeError(proposeResult, txInfo);
return;
}

auto txInfoPtr = Self->GetProgressTxController().GetTxInfo(txInfo.TxId);
if (!!txInfoPtr) {
if (txInfoPtr->Source != source || txInfoPtr->Cookie != cookie) {
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txInfo.TxId << " has already been proposed");
OnProposeError(proposeResult, txInfo);
}
TTxController::TProposeResult proposeResult;
OnProposeResult(proposeResult, *txInfoPtr);
} else {
auto proposeResult = txOperator->Propose(*Self, txc, false);
if (!!proposeResult) {
const auto fullTxInfo = txOperator->TxWithDeadline() ? Self->GetProgressTxController().RegisterTxWithDeadline(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc)
: Self->GetProgressTxController().RegisterTx(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc);

OnProposeResult(proposeResult, fullTxInfo);
} else {
OnProposeError(proposeResult, txInfo);
}
}
}
}
22 changes: 22 additions & 0 deletions ydb/core/tx/columnshard/transactions/propose_transaction_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once
#include "tx_controller.h"

namespace NKikimr::NColumnShard {

class TColumnShard;

class TProposeTransactionBase : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
public:
TProposeTransactionBase(TColumnShard* self)
: TBase(self)
{}

protected:
void ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc);

virtual void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) = 0;
virtual void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) = 0;
};


}
Loading

0 comments on commit f5dc352

Please sign in to comment.