diff --git a/ydb/core/tx/datashard/build_write_out_rs_unit.cpp b/ydb/core/tx/datashard/build_write_out_rs_unit.cpp index 0b3ec0a12332..446507e326cb 100644 --- a/ydb/core/tx/datashard/build_write_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_write_out_rs_unit.cpp @@ -50,7 +50,7 @@ EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransaction DataShard.ReleaseCache(*writeOp); if (writeOp->IsTxDataReleased()) { - switch (Pipeline.RestoreDataTx(writeOp, txc)) { + switch (Pipeline.RestoreWriteTx(writeOp, txc)) { case ERestoreDataStatus::Ok: break; case ERestoreDataStatus::Restart: @@ -72,8 +72,6 @@ EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransaction return EExecutionStatus::Executed; } - writeTx->SetReadVersion(DataShard.GetReadWriteVersions(writeOp).ReadVersion); - try { const auto& kqpLocks = writeTx->GetKqpLocks() ? writeTx->GetKqpLocks().value() : NKikimrDataEvents::TKqpLocks{}; KqpFillOutReadSets(op->OutReadSets(), kqpLocks, true, nullptr, DataShard.SysLocksTable(), tabletId); diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 06e8caca6d84..d9966c6a2e7d 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -305,7 +305,14 @@ class TDataShardEngineHost final } bool IsValidKey(TKeyDesc& key) const override { - TKeyValidator::TValidateOptions options(UserDb); + TKeyValidator::TValidateOptions options( + UserDb.GetLockTxId(), + UserDb.GetLockNodeId(), + UserDb.GetIsRepeatableSnapshot(), + UserDb.GetIsImmediateTx(), + UserDb.GetIsWriteTx(), + Scheme + ); return GetKeyValidator().IsValidKey(key, options); } @@ -469,7 +476,7 @@ class TDataShardEngineHost final TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId) : StepTxId(stepTxId) - , KeyValidator(*self, txc.DB) + , KeyValidator(*self) { auto now = TAppData::TimeProvider->Now(); EngineHost = MakeHolder(self, *this, txc.DB, stepTxId.TxId, EngineHostCounters, now); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 51a93b67dd71..ed7430fce86f 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -229,12 +229,13 @@ ui32 TValidatedDataTx::ExtractKeys(bool allowErrors) return KeysCount(); } -bool TValidatedDataTx::ReValidateKeys() +bool TValidatedDataTx::ReValidateKeys(const NTable::TScheme& scheme) { using EResult = NMiniKQL::IEngineFlat::EResult; if (IsKqpTx()) { - TKeyValidator::TValidateOptions options(EngineBay.GetUserDb()); + const auto& userDb = EngineBay.GetUserDb(); + TKeyValidator::TValidateOptions options(userDb.GetLockTxId(), userDb.GetLockNodeId(), userDb.GetIsRepeatableSnapshot(), userDb.GetIsImmediateTx(), userDb.GetIsWriteTx(), scheme); auto [result, error] = EngineBay.GetKeyValidator().ValidateKeys(options); if (result != EResult::Ok) { ErrStr = std::move(error); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index a654b191a2d4..8783c5866e46 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -266,7 +266,7 @@ class TValidatedDataTx : TNonCopyable, public TValidatedTx { const NKikimrTxDataShard::TReadTableTransaction &GetReadTableTransaction() const { return Tx.GetReadTableTransaction(); } ui32 ExtractKeys(bool allowErrors); - bool ReValidateKeys(); + bool ReValidateKeys(const NTable::TScheme& scheme); ui64 GetTxSize() const { return TxSize; } ui32 KeysCount() const { return TxInfo().ReadsCount + TxInfo().WritesCount; } @@ -468,9 +468,9 @@ class TActiveTransaction : public TOperation { return 0; } - bool ReValidateKeys() { + bool ReValidateKeys(const NTable::TScheme& scheme) { if (DataTx && (DataTx->ProgramSize() || DataTx->IsKqpDataTx())) - return DataTx->ReValidateKeys(); + return DataTx->ReValidateKeys(scheme); return true; } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 98d86bae7d4f..63273041deba 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -610,10 +610,10 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext& } else if (writeOp->HasVolatilePrepareFlag()) { // Since transaction is volatile it was never stored on disk, and it // shouldn't have any artifacts yet. - writeOp->FillVolatileTxData(Self, txc); + writeOp->FillVolatileTxData(Self); ui32 keysCount = 0; - keysCount = writeOp->ExtractKeys(); + keysCount = writeOp->ExtractKeys(txc.DB.GetScheme()); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "LoadWriteDetails at " << Self->TabletID() << " loaded writeOp from memory " << writeOp->GetStep() << ":" << writeOp->GetTxId() << " keys extracted: " << keysCount); } else { @@ -631,11 +631,11 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext& if (MaybeRequestMoreTxMemory(requiredMem, txc)) return false; - writeOp->FillTxData(Self, txc, target, txBody, std::move(locks), artifactFlags); + writeOp->FillTxData(Self, target, txBody, std::move(locks), artifactFlags); ui32 keysCount = 0; //if (Config.LimitActiveTx > 1) - keysCount = writeOp->ExtractKeys(); + keysCount = writeOp->ExtractKeys(txc.DB.GetScheme()); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "LoadWriteDetails at " << Self->TabletID() << " loaded writeOp from db " << writeOp->GetStep() << ":" << writeOp->GetTxId() << " keys extracted: " << keysCount); } @@ -1632,7 +1632,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& { const auto& rec = ev->Get()->Record; TBasicOpInfo info(rec.GetTxId(), EOperationKind::WriteTx, NEvWrite::TConvertor::GetProposeFlags(rec.GetTxMode()), 0, receivedAt, tieBreakerIndex); - auto writeOp = MakeIntrusive(info, std::move(ev), Self, txc); + auto writeOp = MakeIntrusive(info, std::move(ev), Self); writeOp->OperationSpan = std::move(operationSpan); auto writeTx = writeOp->GetWriteTx(); Y_ABORT_UNLESS(writeTx); @@ -1647,7 +1647,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& return writeOp; } - writeTx->ExtractKeys(true); + writeTx->ExtractKeys(txc.DB.GetScheme(), true); if (!writeTx->Ready()) { badRequest(NEvWrite::TConvertor::ConvertErrCode(writeOp->GetWriteTx()->GetErrCode()), TStringBuilder() << "Cannot parse tx keys " << writeOp->GetTxId() << ". " << writeOp->GetWriteTx()->GetErrCode() << ": " << writeOp->GetWriteTx()->GetErrStr()); diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 6a2d3ef0d211..e421184b0550 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -277,12 +277,12 @@ class TPipeline : TNonCopyable { return tx->RestoreTxData(Self, txc, ctx); } - ERestoreDataStatus RestoreDataTx( - TWriteOperation* tx, + ERestoreDataStatus RestoreWriteTx( + TWriteOperation* writeOp, TTransactionContext& txc ) { - return tx->RestoreTxData(Self, txc); + return writeOp->RestoreTxData(Self, txc.DB); } void RegisterDistributedWrites(const TOperation::TPtr& op, NTable::TDatabase& db); diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index a7f5d45f57fc..b54cee3b0e52 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -35,10 +35,11 @@ namespace NKikimr { namespace NDataShard { -TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev) - : UserDb(*self, txc.DB, globalTxId, TRowVersion::Min(), TRowVersion::Max(), EngineHostCounters, TAppData::TimeProvider->Now()) - , KeyValidator(*self, txc.DB) +TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev) + : KeyValidator(*self) , TabletId(self->TabletID()) + , IsImmediate(ev.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) + , GlobalTxId(globalTxId) , ReceivedAt(receivedAt) , TxSize(0) , ErrCode(NKikimrTxDataShard::TError::OK) @@ -47,18 +48,13 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ComputeTxSize(); NActors::NMemory::TLabel::Add(TxSize); - UserDb.SetIsWriteTx(true); - const NKikimrDataEvents::TEvWrite& record = ev.Record; if (record.GetLockTxId()) { - UserDb.SetLockTxId(record.GetLockTxId()); - UserDb.SetLockNodeId(record.GetLockNodeId()); + LockTxId = record.GetLockTxId(); + LockNodeId = record.GetLockNodeId(); } - if (record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) - UserDb.SetIsImmediateTx(true); - NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; LOG_T("Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << record.ShortDebugString()); @@ -218,12 +214,12 @@ void TValidatedWriteTx::SetTxKeys(const TUserTable& tableInfo) } } -ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors) +ui32 TValidatedWriteTx::ExtractKeys(const NTable::TScheme& scheme, bool allowErrors) { if (!HasOperations()) return 0; - bool isValid = ReValidateKeys(); + bool isValid = ReValidateKeys(scheme); if (allowErrors) { if (!isValid) { return 0; @@ -235,11 +231,11 @@ ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors) return KeysCount(); } -bool TValidatedWriteTx::ReValidateKeys() +bool TValidatedWriteTx::ReValidateKeys(const NTable::TScheme& scheme) { using EResult = NMiniKQL::IEngineFlat::EResult; - TKeyValidator::TValidateOptions options(UserDb); + TKeyValidator::TValidateOptions options(LockTxId, LockNodeId, false, IsImmediate, true, scheme); auto [result, error] = GetKeyValidator().ValidateKeys(options); if (result != EResult::Ok) { ErrStr = std::move(error); @@ -298,7 +294,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, ui64 tabletId) TrackMemory(); } -TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self, TTransactionContext& txc) +TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self) : TWriteOperation(op, self->TabletID()) { SetTarget(ev->Sender); @@ -310,7 +306,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::T Orbit = std::move(evPtr->MoveOrbit()); WriteRequest.reset(evPtr.Release()); - BuildWriteTx(self, txc); + BuildWriteTx(self); TrackMemory(); } @@ -329,7 +325,7 @@ void TWriteOperation::FillTxData(TValidatedWriteTx::TPtr writeTx) WriteTx = writeTx; } -void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, const TActorId& target, const TString& txBody, const TVector& locks, ui64 artifactFlags) +void TWriteOperation::FillTxData(TDataShard* self, const TActorId& target, const TString& txBody, const TVector& locks, ui64 artifactFlags) { UntrackMemory(); @@ -345,20 +341,20 @@ void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, con } ArtifactFlags = artifactFlags; Y_ABORT_UNLESS(!WriteTx); - BuildWriteTx(self, txc); + BuildWriteTx(self); Y_ABORT_UNLESS(WriteTx->Ready()); TrackMemory(); } -void TWriteOperation::FillVolatileTxData(TDataShard* self, TTransactionContext& txc) +void TWriteOperation::FillVolatileTxData(TDataShard* self) { UntrackMemory(); Y_ABORT_UNLESS(!WriteTx); Y_ABORT_UNLESS(WriteRequest); - BuildWriteTx(self, txc); + BuildWriteTx(self); Y_ABORT_UNLESS(WriteTx->Ready()); @@ -406,11 +402,11 @@ void TWriteOperation::ClearTxBody() { TrackMemory(); } -TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransactionContext& txc) +TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self) { if (!WriteTx) { Y_ABORT_UNLESS(WriteRequest); - WriteTx = std::make_shared(self, txc, GetGlobalTxId(), GetReceivedAt(), *WriteRequest); + WriteTx = std::make_shared(self, GetGlobalTxId(), GetReceivedAt(), *WriteRequest); } return WriteTx; } @@ -478,7 +474,7 @@ ui64 TWriteOperation::GetMemoryConsumption() const { return res; } -ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransactionContext& txc) +ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, NTable::TDatabase& db) { if (!WriteTx) { ReleasedTxDataSize = 0; @@ -493,10 +489,10 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction TVector locks; if (!IsImmediate() && !HasVolatilePrepareFlag()) { - NIceDb::TNiceDb db(txc.DB); + NIceDb::TNiceDb niceDb(db); TString txBody; - bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, txBody, locks, ArtifactFlags); + bool ok = self->TransQueue.LoadTxDetails(niceDb, GetTxId(), Target, txBody, locks, ArtifactFlags); if (!ok) { WriteRequest.reset(); ArtifactFlags = 0; @@ -515,9 +511,9 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction bool extractKeys = WriteTx->IsTxInfoLoaded(); - WriteTx = std::make_shared(self, txc, GetTxId(), GetReceivedAt(), *WriteRequest); + WriteTx = std::make_shared(self, GetTxId(), GetReceivedAt(), *WriteRequest); if (WriteTx->Ready() && extractKeys) { - WriteTx->ExtractKeys(true); + WriteTx->ExtractKeys(db.GetScheme(), true); } if (!WriteTx->Ready()) { diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h index 5977d5f93559..cdf75e20d590 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.h +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -19,7 +19,7 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx { public: using TPtr = std::shared_ptr; - TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev); + TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev); ~TValidatedWriteTx(); EType GetType() const override { @@ -31,18 +31,10 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx { } ui64 GetTxId() const override { - return UserDb.GetGlobalTxId(); + return GlobalTxId; } - ui64 LockTxId() const { - return UserDb.GetLockTxId(); - } - ui32 LockNodeId() const { - return UserDb.GetLockNodeId(); - } - bool Immediate() const { - return UserDb.GetIsImmediateTx(); - } + bool NeedDiagnostics() const { return true; } @@ -59,7 +51,7 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx { return TxInfo().HasWrites(); } bool HasLockedWrites() const { - return HasWrites() && LockTxId(); + return HasWrites() && LockTxId; } bool HasDynamicWrites() const { return TxInfo().DynKeysCount != 0; @@ -72,51 +64,12 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx { return KeyValidator; } - TDataShardUserDb& GetUserDb() { - return UserDb; - } - - const TDataShardUserDb& GetUserDb() const { - return UserDb; - } - bool CanCancel(); bool CheckCancelled(); - void SetWriteVersion(TRowVersion writeVersion) { - UserDb.SetWriteVersion(writeVersion); - } - void SetReadVersion(TRowVersion readVersion) { - UserDb.SetReadVersion(readVersion); - } - - void SetVolatileTxId(ui64 txId) { - UserDb.SetVolatileTxId(txId); - } - - TVector GetCollectedChanges() const { - return UserDb.GetCollectedChanges(); - } - void ResetCollectedChanges() { - UserDb.ResetCollectedChanges(); - } + ui32 ExtractKeys(const NTable::TScheme& scheme, bool allowErrors); + bool ReValidateKeys(const NTable::TScheme& scheme); - TVector GetVolatileCommitTxIds() const { - return UserDb.GetVolatileCommitTxIds(); - } - const absl::flat_hash_set& GetVolatileDependencies() const { - return UserDb.GetVolatileDependencies(); - } - std::optional GetVolatileChangeGroup() { - return UserDb.GetChangeGroup(); - } - bool GetVolatileCommitOrdered() const { - return UserDb.GetVolatileCommitOrdered(); - } - - ui32 ExtractKeys(bool allowErrors); - bool ReValidateKeys(); - ui64 HasOperations() const { return Matrix.GetRowCount() != 0; } @@ -153,12 +106,15 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx { void ComputeTxSize(); private: - TDataShardUserDb UserDb; TKeyValidator KeyValidator; - NMiniKQL::TEngineHostCounters EngineHostCounters; const ui64 TabletId; + const bool IsImmediate; + + YDB_READONLY_DEF(ui64, LockTxId); + YDB_READONLY_DEF(ui32, LockNodeId); + YDB_READONLY_DEF(ui64, GlobalTxId); YDB_READONLY_DEF(TTableId, TableId); YDB_READONLY_DEF(std::optional, KqpLocks); YDB_READONLY_DEF(std::vector, ColumnIds); @@ -180,12 +136,12 @@ class TWriteOperation : public TOperation { static TWriteOperation* CastWriteOperation(TOperation::TPtr op); explicit TWriteOperation(const TBasicOpInfo& op, ui64 tabletId); - explicit TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self, TTransactionContext& txc); + explicit TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self); ~TWriteOperation(); void FillTxData(TValidatedWriteTx::TPtr dataTx); - void FillTxData(TDataShard* self, TTransactionContext& txc, const TActorId& target, const TString& txBody, const TVector& locks, ui64 artifactFlags); - void FillVolatileTxData(TDataShard* self, TTransactionContext& txc); + void FillTxData(TDataShard* self, const TActorId& target, const TString& txBody, const TVector& locks, ui64 artifactFlags); + void FillVolatileTxData(TDataShard* self); TString GetTxBody() const; void SetTxBody(const TString& txBody); @@ -197,12 +153,12 @@ class TWriteOperation : public TOperation { TOperation::Deactivate(); } - ui32 ExtractKeys() { - return WriteTx ? WriteTx->ExtractKeys(false) : 0; + ui32 ExtractKeys(const NTable::TScheme& scheme) { + return WriteTx ? WriteTx->ExtractKeys(scheme, false) : 0; } - bool ReValidateKeys() { - return WriteTx ? WriteTx->ReValidateKeys() : true; + bool ReValidateKeys(const NTable::TScheme& scheme) { + return WriteTx ? WriteTx->ReValidateKeys(scheme) : true; } void MarkAsUsingSnapshot() { @@ -247,7 +203,7 @@ class TWriteOperation : public TOperation { } void ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider); - ERestoreDataStatus RestoreTxData(TDataShard* self, TTransactionContext& txc); + ERestoreDataStatus RestoreTxData(TDataShard* self, NTable::TDatabase& db); // TOperation iface. void BuildExecutionPlan(bool loaded) override; @@ -267,11 +223,11 @@ class TWriteOperation : public TOperation { } ui64 LockTxId() const override { - return WriteTx ? WriteTx->LockTxId() : 0; + return WriteTx ? WriteTx->GetLockTxId() : 0; } ui32 LockNodeId() const override { - return WriteTx ? WriteTx->LockNodeId() : 0; + return WriteTx ? WriteTx->GetLockNodeId() : 0; } bool HasLockedWrites() const override { @@ -288,7 +244,7 @@ class TWriteOperation : public TOperation { TValidatedWriteTx::TPtr& GetWriteTx() { return WriteTx; } - TValidatedWriteTx::TPtr BuildWriteTx(TDataShard* self, TTransactionContext& txc); + TValidatedWriteTx::TPtr BuildWriteTx(TDataShard* self); void ClearWriteTx() { WriteTx = nullptr; diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index 00a0baa51b69..81f5396cd178 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -104,7 +104,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, IEngineFlat* engine = tx->GetDataTx()->GetEngine(); Y_VERIFY_S(engine, "missing engine for " << *op << " at " << DataShard.TabletID()); - if (op->IsImmediate() && !tx->ReValidateKeys()) { + if (op->IsImmediate() && !tx->ReValidateKeys(txc.DB.GetScheme())) { // Immediate transactions may be reordered with schema changes and become invalid const auto& dataTx = tx->GetDataTx(); Y_ABORT_UNLESS(!dataTx->Ready()); diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 3808fc2d4d4d..e315b6941631 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -107,7 +107,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio ui64 tabletId = DataShard.TabletID(); const TValidatedDataTx::TPtr& dataTx = tx->GetDataTx(); - if (op->IsImmediate() && !dataTx->ReValidateKeys()) { + if (op->IsImmediate() && !dataTx->ReValidateKeys(txc.DB.GetScheme())) { // Immediate transactions may be reordered with schema changes and become invalid Y_ABORT_UNLESS(!dataTx->Ready()); op->SetAbortedFlag(); diff --git a/ydb/core/tx/datashard/execute_write_unit.cpp b/ydb/core/tx/datashard/execute_write_unit.cpp index 7e2fc8fdbc54..85a30bbecb1e 100644 --- a/ydb/core/tx/datashard/execute_write_unit.cpp +++ b/ydb/core/tx/datashard/execute_write_unit.cpp @@ -55,19 +55,19 @@ class TExecuteWriteUnit : public TExecutionUnit { DataShard.SubscribeNewLocks(ctx); } - EExecutionStatus OnTabletNotReady(TWriteOperation& writeOp, TValidatedWriteTx& writeTx, TTransactionContext& txc, const TActorContext& ctx) + EExecutionStatus OnTabletNotReady(TDataShardUserDb& userDb, TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx) { LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID() << " is not ready for " << writeOp << " execution"); DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY); - writeTx.ResetCollectedChanges(); + userDb.ResetCollectedChanges(); writeOp.ReleaseTxData(txc); return EExecutionStatus::Restart; } - void DoUpdateToUserDb(TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) { + void DoUpdateToUserDb(TDataShardUserDb& userDb, TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) { TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx(); if (!writeTx->HasOperations()) { @@ -122,7 +122,7 @@ class TExecuteWriteUnit : public TExecutionUnit { ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeInfo)); } - writeTx->GetUserDb().UpdateRow(fullTableId, key, ops); + userDb.UpdateRow(fullTableId, key, ops); } DataShard.IncCounter(COUNTER_WRITE_ROWS, matrix.GetRowCount()); @@ -152,10 +152,9 @@ class TExecuteWriteUnit : public TExecutionUnit { const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx(); DataShard.ReleaseCache(*writeOp); - writeTx->GetUserDb().ResetCounters(); if (writeOp->IsTxDataReleased()) { - switch (Pipeline.RestoreDataTx(writeOp, txc)) { + switch (Pipeline.RestoreWriteTx(writeOp, txc)) { case ERestoreDataStatus::Ok: break; @@ -180,7 +179,7 @@ class TExecuteWriteUnit : public TExecutionUnit { ui64 tabletId = DataShard.TabletID(); - if (op->IsImmediate() && !writeOp->ReValidateKeys()) { + if (op->IsImmediate() && !writeOp->ReValidateKeys(txc.DB.GetScheme())) { // Immediate transactions may be reordered with schema changes and become invalid Y_ABORT_UNLESS(!writeTx->Ready()); writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, writeTx->GetErrStr()); @@ -194,8 +193,21 @@ class TExecuteWriteUnit : public TExecutionUnit { return EExecutionStatus::Executed; } + NMiniKQL::TEngineHostCounters engineHostCounters; + const ui64 txId = writeTx->GetTxId(); + const auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(writeOp); + + TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), readVersion, writeVersion, engineHostCounters, TAppData::TimeProvider->Now()); + userDb.SetIsWriteTx(true); + userDb.SetIsImmediateTx(op->IsImmediate()); + userDb.SetLockTxId(writeTx->GetLockTxId()); + userDb.SetLockNodeId(writeTx->GetLockNodeId()); + + if (op->HasVolatilePrepareFlag()) { + userDb.SetVolatileTxId(txId); + } + try { - const ui64 txId = writeTx->GetTxId(); const auto* kqpLocks = writeTx->GetKqpLocks() ? &writeTx->GetKqpLocks().value() : nullptr; const auto& inReadSets = op->InReadSets(); auto& awaitingDecisions = op->AwaitingDecisions(); @@ -273,18 +285,9 @@ class TExecuteWriteUnit : public TExecutionUnit { return EExecutionStatus::Executed; } - auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(writeOp); - writeTx->SetReadVersion(readVersion); - writeTx->SetWriteVersion(writeVersion); - - if (op->HasVolatilePrepareFlag()) { - writeTx->SetVolatileTxId(txId); - } - - - KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, writeTx->GetUserDb()); + KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, userDb); - DoUpdateToUserDb(writeOp, txc, ctx); + DoUpdateToUserDb(userDb, writeOp, txc, ctx); writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCompleted(DataShard.TabletID(), writeOp->GetTxId())); @@ -294,7 +297,7 @@ class TExecuteWriteUnit : public TExecutionUnit { writeResult->Record.SetStep(op->GetStep()); if (Pipeline.AddLockDependencies(op, guardLocks)) { - writeTx->ResetCollectedChanges(); + userDb.ResetCollectedChanges(); writeOp->ReleaseTxData(txc); if (txc.DB.HasChanges()) { txc.DB.RollbackChanges(); @@ -304,17 +307,17 @@ class TExecuteWriteUnit : public TExecutionUnit { // Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies // Such transactions would have no participants and become immediately committed - auto commitTxIds = writeTx->GetVolatileCommitTxIds(); + auto commitTxIds = userDb.GetVolatileCommitTxIds(); if (commitTxIds) { TVector participants(awaitingDecisions.begin(), awaitingDecisions.end()); DataShard.GetVolatileTxManager().PersistAddVolatileTx( txId, writeVersion, commitTxIds, - writeTx->GetVolatileDependencies(), + userDb.GetVolatileDependencies(), participants, - writeTx->GetVolatileChangeGroup(), - writeTx->GetVolatileCommitOrdered(), + userDb.GetChangeGroup(), + userDb.GetVolatileCommitOrdered(), txc ); } @@ -335,11 +338,11 @@ class TExecuteWriteUnit : public TExecutionUnit { // Note: may erase persistent locks, must be after we persist volatile tx AddLocksToResult(writeOp, ctx); - if (auto changes = std::move(writeTx->GetCollectedChanges())) { + if (auto changes = std::move(userDb.GetCollectedChanges())) { op->ChangeRecords() = std::move(changes); } - auto& counters = writeTx->GetUserDb().GetCounters(); + const auto& counters = userDb.GetCounters(); KqpUpdateDataShardStatCounters(DataShard, counters); KqpFillTxStats(DataShard, counters, *writeResult->Record.MutableTxStats()); @@ -359,9 +362,9 @@ class TExecuteWriteUnit : public TExecutionUnit { } return EExecutionStatus::Continue; } catch (const TNotReadyTabletException&) { - return OnTabletNotReady(*writeOp, *writeTx, txc, ctx); + return OnTabletNotReady(userDb, *writeOp, txc, ctx); } catch (const TLockedWriteLimitException&) { - writeTx->ResetCollectedChanges(); + userDb.ResetCollectedChanges(); writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Shard " << DataShard.TabletID() << " cannot write more uncommitted changes"); diff --git a/ydb/core/tx/datashard/key_validator.cpp b/ydb/core/tx/datashard/key_validator.cpp index ee9f9b5d1d4c..6895b5e74cac 100644 --- a/ydb/core/tx/datashard/key_validator.cpp +++ b/ydb/core/tx/datashard/key_validator.cpp @@ -12,9 +12,8 @@ using namespace NKikimr; using namespace NKikimr::NDataShard; -TKeyValidator::TKeyValidator(const TDataShard& self, const NTable::TDatabase& db) +TKeyValidator::TKeyValidator(const TDataShard& self) : Self(self) - , Db(db) { } @@ -65,12 +64,19 @@ void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& ra Info.SetLoaded(); } -TKeyValidator::TValidateOptions::TValidateOptions(const TDataShardUserDb& userDb) - : IsLockTxId(static_cast(userDb.GetLockTxId())) - , IsLockNodeId(static_cast(userDb.GetLockNodeId())) - , IsRepeatableSnapshot(userDb.GetIsRepeatableSnapshot()) - , IsImmediateTx(userDb.GetIsImmediateTx()) - , IsWriteTx(userDb.GetIsWriteTx()) +TKeyValidator::TValidateOptions::TValidateOptions( + ui64 LockTxId, + ui32 LockNodeId, + bool isRepeatableSnapshot, + bool isImmediateTx, + bool isWriteTx, + const NTable::TScheme& scheme) + : IsLockTxId(static_cast(LockTxId)) + , IsLockNodeId(static_cast(LockNodeId)) + , IsRepeatableSnapshot(isRepeatableSnapshot) + , IsImmediateTx(isImmediateTx) + , IsWriteTx(isWriteTx) + , Scheme(scheme) { } @@ -94,7 +100,7 @@ bool TKeyValidator::IsValidKey(TKeyDesc& key, const TValidateOptions& opt) const } ui64 localTableId = Self.GetLocalTableId(key.TableId); - return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key); + return NMiniKQL::IsValidKey(opt.Scheme, localTableId, key); } ui64 TKeyValidator::GetTableSchemaVersion(const TTableId& tableId) const { diff --git a/ydb/core/tx/datashard/key_validator.h b/ydb/core/tx/datashard/key_validator.h index 9b3edd0c667b..3a7070e469ed 100644 --- a/ydb/core/tx/datashard/key_validator.h +++ b/ydb/core/tx/datashard/key_validator.h @@ -13,7 +13,7 @@ class TDataShardUserDb; class TKeyValidator { public: - TKeyValidator(const TDataShard& self, const NTable::TDatabase& db); + TKeyValidator(const TDataShard& self); struct TColumnWriteMeta { NTable::TColumn Column; @@ -29,8 +29,14 @@ class TKeyValidator { bool IsRepeatableSnapshot; bool IsImmediateTx; bool IsWriteTx; - - TValidateOptions(const TDataShardUserDb& userDb); + const NTable::TScheme& Scheme; + + TValidateOptions(ui64 LockTxId, + ui32 LockNodeId, + bool isRepeatableSnapshot, + bool isImmediateTx, + bool isWriteTx, + const NTable::TScheme& scheme); }; bool IsValidKey(TKeyDesc& key, const TValidateOptions& options) const; @@ -42,7 +48,6 @@ class TKeyValidator { const NMiniKQL::IEngineFlat::TValidationInfo& GetInfo() const; private: const TDataShard& Self; - const NTable::TDatabase& Db; NMiniKQL::IEngineFlat::TValidationInfo Info; }; diff --git a/ydb/core/tx/datashard/prepare_write_tx_in_rs_unit.cpp b/ydb/core/tx/datashard/prepare_write_tx_in_rs_unit.cpp index 5a8add513814..2485ec448846 100644 --- a/ydb/core/tx/datashard/prepare_write_tx_in_rs_unit.cpp +++ b/ydb/core/tx/datashard/prepare_write_tx_in_rs_unit.cpp @@ -38,7 +38,7 @@ EExecutionStatus TPrepareWriteTxInRSUnit::Execute(TOperation::TPtr op, TTransact const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx(); if (writeOp->IsTxDataReleased()) { - switch (Pipeline.RestoreDataTx(writeOp, txc)) { + switch (Pipeline.RestoreWriteTx(writeOp, txc)) { case ERestoreDataStatus::Ok: break; case ERestoreDataStatus::Restart: