Skip to content

Commit

Permalink
tx operations start unification (ydb-platform#7178)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Sep 15, 2024
1 parent e7b4620 commit a7fdd7c
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 34 deletions.
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}

void TTxInit::Complete(const TActorContext& ctx) {
Self->ProgressTxController->StartOperators();
Self->ProgressTxController->OnTabletInit();
Self->SwitchToWork(ctx);
NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace NKikimr::NColumnShard {
public:
using TBase::TBase;

void OnTabletInit(TColumnShard& owner) override {
virtual void DoOnTabletInit(TColumnShard& owner) override {
for (auto&& writeId : WriteIds) {
AFL_VERIFY(owner.LongTxWrites.contains(writeId))("problem", "ltx_not_exists_for_write_id")("txId", GetTxId())("writeId", (ui64)writeId);
owner.AddLongTxWrite(writeId, GetTxId());
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/tx/columnshard/transactions/operators/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ NKikimr::TConclusionStatus TSchemaTransactionOperator::ValidateTables(::google::
} return TConclusionStatus::Success();
}

bool TSchemaTransactionOperator::DoOnStartAsync(TColumnShard& owner) {
void TSchemaTransactionOperator::DoOnTabletInit(TColumnShard& owner) {
AFL_VERIFY(WaitPathIdsToErase.empty());
switch (SchemaTxBody.TxBody_case()) {
case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
Expand All @@ -190,11 +190,9 @@ bool TSchemaTransactionOperator::DoOnStartAsync(TColumnShard& owner) {
if (WaitPathIdsToErase.size()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "wait_remove_path_id")("pathes", JoinSeq(",", WaitPathIdsToErase))("tx_id", GetTxId());
owner.Subscribers->RegisterSubscriber(std::make_shared<TWaitEraseTablesTxSubscriber>(WaitPathIdsToErase, GetTxId()));
return true;
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "remove_pathes_cleaned")("tx_id", GetTxId());
owner.Execute(new TTxFinishAsyncTransaction(owner, GetTxId()));
return false;
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/transactions/operators/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TSchemaTransactionOperator: public IProposeTxOperator, public TMonitoringO
THashSet<TActorId> NotifySubscribers;
THashSet<ui64> WaitPathIdsToErase;

virtual bool DoOnStartAsync(TColumnShard& owner) override;
virtual void DoOnTabletInit(TColumnShard& owner) override;

template <class TInfoProto>
THashSet<ui64> GetNotErasedTableIds(const TColumnShard& owner, const TInfoProto& tables) const {
Expand Down
24 changes: 8 additions & 16 deletions ydb/core/tx/columnshard/transactions/tx_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,17 @@ TTxController::TTxInfo TTxController::RegisterTxWithDeadline(const std::shared_p
return txInfo;
}

bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
auto opIt = Operators.find(txId);
bool TTxController::AbortTx(const TPlanQueueItem planQueueItem, NTabletFlatExecutor::TTransactionContext& txc) {
auto opIt = Operators.find(planQueueItem.TxId);
Y_ABORT_UNLESS(opIt != Operators.end());
Y_ABORT_UNLESS(opIt->second->GetTxInfo().PlanStep == 0);
opIt->second->ExecuteOnAbort(Owner, txc);
opIt->second->CompleteOnAbort(Owner, NActors::TActivationContext::AsActorContext());

if (opIt->second->GetTxInfo().MaxStep != Max<ui64>()) {
DeadlineQueue.erase(TPlanQueueItem(opIt->second->GetTxInfo().MaxStep, txId));
}
Operators.erase(txId);
AFL_VERIFY(Operators.erase(planQueueItem.TxId));
AFL_VERIFY(DeadlineQueue.erase(planQueueItem));
NIceDb::TNiceDb db(txc.DB);
Schema::EraseTxInfo(db, txId);
Schema::EraseTxInfo(db, planQueueItem.TxId);
return true;
}

Expand Down Expand Up @@ -252,7 +250,7 @@ size_t TTxController::CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext&
}
ui64 txId = it->TxId;
LOG_S_DEBUG(TStringBuilder() << "Removing outdated txId " << txId << " max step " << it->Step << " outdated step ");
AbortTx(txId, txc);
AbortTx(*it, txc);
++removedCount;
}
}
Expand Down Expand Up @@ -292,6 +290,8 @@ TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64
}

void TTxController::OnTabletInit() {
AFL_VERIFY(!StartedFlag);
StartedFlag = true;
for (auto&& txOperator : Operators) {
txOperator.second->OnTabletInit(Owner);
}
Expand Down Expand Up @@ -373,14 +373,6 @@ void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext
txOperator->SendReply(Owner, ctx);
}

void TTxController::StartOperators() {
AFL_VERIFY(!StartedFlag);
StartedFlag = true;
for (auto&& i : Operators) {
Y_UNUSED(i.second->OnStartAsync(Owner));
}
}

void TTxController::ITransactionOperator::SwitchStateVerified(const EStatus from, const EStatus to) {
AFL_VERIFY(!Status || *Status == from)("error", "incorrect expected status")("real_state", *Status)("expected", from)("details", DebugString());
Status = to;
Expand Down
22 changes: 10 additions & 12 deletions ydb/core/tx/columnshard/transactions/tx_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,15 @@ class TTxController {
return TxInfo;
}

virtual void DoOnTabletInit(TColumnShard& /*owner*/) {

}

void ResetStatusOnUpdate() {
Status = {};
}

virtual TString DoDebugString() const = 0;
virtual bool DoOnStartAsync(TColumnShard& /*owner*/) {
return false;
}

std::optional<bool> StartedAsync;

Expand All @@ -222,12 +223,6 @@ class TTxController {
return DoCheckTxInfoForReply(originalTxInfo);
}

[[nodiscard]] bool OnStartAsync(TColumnShard& owner) {
AFL_VERIFY(!StartedAsync);
StartedAsync = DoOnStartAsync(owner);
return *StartedAsync;
}

TString DebugString() const {
return DoDebugString();
}
Expand Down Expand Up @@ -345,7 +340,11 @@ class TTxController {
virtual void RegisterSubscriber(const TActorId&) {
AFL_VERIFY(false)("message", "Not implemented");
};
virtual void OnTabletInit(TColumnShard& /*owner*/) {}
void OnTabletInit(TColumnShard& owner) {
AFL_VERIFY(!StartedAsync);
StartedAsync = true;
DoOnTabletInit(owner);
}
};

private:
Expand All @@ -359,7 +358,7 @@ class TTxController {

private:
ui64 GetAllowedStep() const;
bool AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
bool AbortTx(const TPlanQueueItem planQueueItem, NTabletFlatExecutor::TTransactionContext& txc);

TTxInfo RegisterTx(const std::shared_ptr<TTxController::ITransactionOperator>& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc);
TTxInfo RegisterTxWithDeadline(const std::shared_ptr<TTxController::ITransactionOperator>& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc);
Expand All @@ -369,7 +368,6 @@ class TTxController {

ITransactionOperator::TPtr GetTxOperator(const ui64 txId) const;
ITransactionOperator::TPtr GetVerifiedTxOperator(const ui64 txId) const;
void StartOperators();

ui64 GetMemoryUsage() const;
bool HaveOutdatedTxs() const;
Expand Down

0 comments on commit a7fdd7c

Please sign in to comment.