Skip to content

Commit

Permalink
Merge 5822a96 into 43e2061
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 17, 2024
2 parents 43e2061 + 5822a96 commit e429994
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
36 changes: 23 additions & 13 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,39 @@
#include "columnshard_schema.h"

#include <ydb/core/tx/columnshard/operations/write.h>

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>

namespace NKikimr::NColumnShard {

class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
private:
bool AbortedThroughRemoveExpired = false;
TTxController::ITransactionOperator::TPtr TxOperator;
const ui32 TabletTxNo;
std::optional<NOlap::TSnapshot> LastCompletedTx;
std::optional<TTxController::TPlanQueueItem> PlannedQueueItem;

public:
TTxProgressTx(TColumnShard* self)
: TTransactionBase(self)
, TabletTxNo(++Self->TabletTxCounter)
{}
, TabletTxNo(++Self->TabletTxCounter) {
}

TTxType GetTxType() const override { return TXTYPE_PROGRESS; }
TTxType GetTxType() const override {
return TXTYPE_PROGRESS;
}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
Y_ABORT_UNLESS(Self->ProgressTxInFlight);
Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TX_COMPLETE_LAG, Self->GetTxCompleteLag().MilliSeconds());

const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc);
if (removedCount > 0) {
// We cannot continue with this transaction, start a new transaction
AbortedThroughRemoveExpired = true;
Self->Execute(new TTxProgressTx(Self), ctx);
return true;
}
Expand All @@ -49,7 +61,11 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
}

void Complete(const TActorContext& ctx) override {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
if (AbortedThroughRemoveExpired) {
return;
}
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
if (TxOperator) {
TxOperator->ProgressOnComplete(*Self, ctx);
Self->RescheduleWaitingReads();
Expand All @@ -66,12 +82,6 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
}
Self->SetupIndexation();
}

private:
TTxController::ITransactionOperator::TPtr TxOperator;
const ui32 TabletTxNo;
std::optional<NOlap::TSnapshot> LastCompletedTx;
std::optional<TTxController::TPlanQueueItem> PlannedQueueItem;
};

void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
Expand Down Expand Up @@ -102,4 +112,4 @@ void TColumnShard::Handle(TEvColumnShard::TEvCheckPlannedTransaction::TPtr& ev,
// For now do not return result for not finished tx. It would be sent in TTxProgressTx::Complete()
}

}
} // namespace NKikimr::NColumnShard
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ namespace NKikimr::NColumnShard {
return "LONG_TX_WRITE";
}

bool TxWithDeadline() const override {
return true;
}

virtual TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;
virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {

Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/columnshard/transactions/tx_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
return false;
}

ui32 countWithDeadline = 0;
ui32 countOverrideDeadline = 0;
ui32 countNoDeadline = 0;
while (!rowset.EndOfSet()) {
const ui64 txId = rowset.GetValue<Schema::TxInfo::TxId>();
const NKikimrTxColumnShard::ETransactionKind txKind = rowset.GetValue<Schema::TxInfo::TxKind>();
Expand All @@ -58,6 +61,13 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
txInfo.MaxStep = rowset.GetValue<Schema::TxInfo::MaxStep>();
if (txInfo.MaxStep != Max<ui64>()) {
txInfo.MinStep = txInfo.MaxStep - MaxCommitTxDelay.MilliSeconds();
++countWithDeadline;
} else if (txOperator->TxWithDeadline()) {
txInfo.MinStep = GetAllowedStep();
txInfo.MaxStep = txInfo.MinStep + MaxCommitTxDelay.MilliSeconds();
++countOverrideDeadline;
} else {
++countNoDeadline;
}
txInfo.PlanStep = rowset.GetValueOrDefault<Schema::TxInfo::PlanStep>(0);
txInfo.Source = rowset.GetValue<Schema::TxInfo::Source>();
Expand All @@ -75,6 +85,8 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
return false;
}
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("override", countOverrideDeadline)("no_dl", countNoDeadline)("dl", countWithDeadline)(
"operators", Operators.size())("plan", PlanQueue.size())("dl_queue", DeadlineQueue.size());
return true;
}

Expand Down

0 comments on commit e429994

Please sign in to comment.