Skip to content

Commit

Permalink
Remove TActiveTransaction from datashard_kqp (#1454)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Feb 1, 2024
1 parent f2e86bd commit ec89d22
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 154 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion);

if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) {
auto result = KqpRunTransaction(ctx, op->GetTxId(), kqpLocks, useGenericReadSets, tasksRunner);
auto result = KqpRunTransaction(ctx, op->GetTxId(), useGenericReadSets, tasksRunner);

Y_VERIFY_S(!dataTx->GetKqpComputeCtx().HadInconsistentReads(),
"Unexpected inconsistent reads in operation " << *op << " when preparing persistent channels");
Expand Down
9 changes: 1 addition & 8 deletions ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class TDataShardEngineHost final
return UserDb.GetChangeCollector(tableId);
}

void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) {
void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) override {
UserDb.CommitChanges(tableId, lockId, writeVersion);
}

Expand Down Expand Up @@ -607,13 +607,6 @@ void TEngineBay::SetIsRepeatableSnapshot() {
host->SetIsRepeatableSnapshot();
}

void TEngineBay::CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) {
Y_ABORT_UNLESS(EngineHost);

auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
host->CommitChanges(tableId, lockId, writeVersion);
}

TVector<IDataShardChangeCollector::TChange> TEngineBay::GetCollectedChanges() const {
Y_ABORT_UNLESS(EngineHost);

Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tx/datashard/datashard__engine_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ class TEngineBay : TNonCopyable {
void SetIsImmediateTx();
void SetIsRepeatableSnapshot();

void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion);

TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const;
void ResetCollectedChanges();

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ TValidatedDataTx::~TValidatedDataTx() {
NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Sub(TxSize);
}

TDataShardUserDb& TValidatedDataTx::GetUserDb() {
return EngineBay.GetUserDb();
}
const TDataShardUserDb& TValidatedDataTx::GetUserDb() const {
return EngineBay.GetUserDb();
}

ui32 TValidatedDataTx::ExtractKeys(bool allowErrors)
{
using EResult = NMiniKQL::IEngineFlat::EResult;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/datashard/datashard_active_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ using NTabletFlatExecutor::TTransactionContext;
using NTabletFlatExecutor::TTableSnapshotContext;

class TDataShard;
class TDataShardUserDb;
class TSysLocks;
struct TReadSetKey;
class TActiveTransaction;
Expand Down Expand Up @@ -173,17 +174,16 @@ class TValidatedDataTx : TNonCopyable {
const NMiniKQL::TEngineHostCounters& GetCounters() { return EngineBay.GetCounters(); }
void ResetCounters() { EngineBay.ResetCounters(); }

TDataShardUserDb& GetUserDb();
const TDataShardUserDb& GetUserDb() const;

bool CanCancel();
bool CheckCancelled(ui64 tabletId);

void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); }
void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); }
void SetVolatileTxId(ui64 txId) { EngineBay.SetVolatileTxId(txId); }

void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) {
EngineBay.CommitChanges(tableId, lockId, writeVersion);
}

TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const { return EngineBay.GetCollectedChanges(); }
void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); }

Expand Down
Loading

0 comments on commit ec89d22

Please sign in to comment.