Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move UserDb to TExecuteWriteUnit #2282

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions ydb/core/tx/datashard/build_write_out_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransaction
DataShard.ReleaseCache(*writeOp);

if (writeOp->IsTxDataReleased()) {
switch (Pipeline.RestoreDataTx(writeOp, txc)) {
switch (Pipeline.RestoreWriteTx(writeOp, txc)) {
case ERestoreDataStatus::Ok:
break;
case ERestoreDataStatus::Restart:
Expand All @@ -72,8 +72,6 @@ EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransaction
return EExecutionStatus::Executed;
}

writeTx->SetReadVersion(DataShard.GetReadWriteVersions(writeOp).ReadVersion);

try {
const auto& kqpLocks = writeTx->GetKqpLocks() ? writeTx->GetKqpLocks().value() : NKikimrDataEvents::TKqpLocks{};
KqpFillOutReadSets(op->OutReadSets(), kqpLocks, true, nullptr, DataShard.SysLocksTable(), tabletId);
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,14 @@ class TDataShardEngineHost final
}

bool IsValidKey(TKeyDesc& key) const override {
TKeyValidator::TValidateOptions options(UserDb);
TKeyValidator::TValidateOptions options(
UserDb.GetLockTxId(),
UserDb.GetLockNodeId(),
UserDb.GetIsRepeatableSnapshot(),
UserDb.GetIsImmediateTx(),
UserDb.GetIsWriteTx(),
Scheme
);
return GetKeyValidator().IsValidKey(key, options);
}

Expand Down Expand Up @@ -469,7 +476,7 @@ class TDataShardEngineHost final

TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId)
: StepTxId(stepTxId)
, KeyValidator(*self, txc.DB)
, KeyValidator(*self)
{
auto now = TAppData::TimeProvider->Now();
EngineHost = MakeHolder<TDataShardEngineHost>(self, *this, txc.DB, stepTxId.TxId, EngineHostCounters, now);
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,13 @@ ui32 TValidatedDataTx::ExtractKeys(bool allowErrors)
return KeysCount();
}

bool TValidatedDataTx::ReValidateKeys()
bool TValidatedDataTx::ReValidateKeys(const NTable::TScheme& scheme)
{
using EResult = NMiniKQL::IEngineFlat::EResult;

if (IsKqpTx()) {
TKeyValidator::TValidateOptions options(EngineBay.GetUserDb());
const auto& userDb = EngineBay.GetUserDb();
TKeyValidator::TValidateOptions options(userDb.GetLockTxId(), userDb.GetLockNodeId(), userDb.GetIsRepeatableSnapshot(), userDb.GetIsImmediateTx(), userDb.GetIsWriteTx(), scheme);
auto [result, error] = EngineBay.GetKeyValidator().ValidateKeys(options);
if (result != EResult::Ok) {
ErrStr = std::move(error);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_active_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TValidatedDataTx : TNonCopyable, public TValidatedTx {
const NKikimrTxDataShard::TReadTableTransaction &GetReadTableTransaction() const { return Tx.GetReadTableTransaction(); }

ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();
bool ReValidateKeys(const NTable::TScheme& scheme);

ui64 GetTxSize() const { return TxSize; }
ui32 KeysCount() const { return TxInfo().ReadsCount + TxInfo().WritesCount; }
Expand Down Expand Up @@ -468,9 +468,9 @@ class TActiveTransaction : public TOperation {
return 0;
}

bool ReValidateKeys() {
bool ReValidateKeys(const NTable::TScheme& scheme) {
if (DataTx && (DataTx->ProgramSize() || DataTx->IsKqpDataTx()))
return DataTx->ReValidateKeys();
return DataTx->ReValidateKeys(scheme);
return true;
}

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,10 +610,10 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext&
} else if (writeOp->HasVolatilePrepareFlag()) {
// Since transaction is volatile it was never stored on disk, and it
// shouldn't have any artifacts yet.
writeOp->FillVolatileTxData(Self, txc);
writeOp->FillVolatileTxData(Self);

ui32 keysCount = 0;
keysCount = writeOp->ExtractKeys();
keysCount = writeOp->ExtractKeys(txc.DB.GetScheme());

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "LoadWriteDetails at " << Self->TabletID() << " loaded writeOp from memory " << writeOp->GetStep() << ":" << writeOp->GetTxId() << " keys extracted: " << keysCount);
} else {
Expand All @@ -631,11 +631,11 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext&
if (MaybeRequestMoreTxMemory(requiredMem, txc))
return false;

writeOp->FillTxData(Self, txc, target, txBody, std::move(locks), artifactFlags);
writeOp->FillTxData(Self, target, txBody, std::move(locks), artifactFlags);

ui32 keysCount = 0;
//if (Config.LimitActiveTx > 1)
keysCount = writeOp->ExtractKeys();
keysCount = writeOp->ExtractKeys(txc.DB.GetScheme());

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "LoadWriteDetails at " << Self->TabletID() << " loaded writeOp from db " << writeOp->GetStep() << ":" << writeOp->GetTxId() << " keys extracted: " << keysCount);
}
Expand Down Expand Up @@ -1632,7 +1632,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
{
const auto& rec = ev->Get()->Record;
TBasicOpInfo info(rec.GetTxId(), EOperationKind::WriteTx, NEvWrite::TConvertor::GetProposeFlags(rec.GetTxMode()), 0, receivedAt, tieBreakerIndex);
auto writeOp = MakeIntrusive<TWriteOperation>(info, std::move(ev), Self, txc);
auto writeOp = MakeIntrusive<TWriteOperation>(info, std::move(ev), Self);
writeOp->OperationSpan = std::move(operationSpan);
auto writeTx = writeOp->GetWriteTx();
Y_ABORT_UNLESS(writeTx);
Expand All @@ -1647,7 +1647,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
return writeOp;
}

writeTx->ExtractKeys(true);
writeTx->ExtractKeys(txc.DB.GetScheme(), true);

if (!writeTx->Ready()) {
badRequest(NEvWrite::TConvertor::ConvertErrCode(writeOp->GetWriteTx()->GetErrCode()), TStringBuilder() << "Cannot parse tx keys " << writeOp->GetTxId() << ". " << writeOp->GetWriteTx()->GetErrCode() << ": " << writeOp->GetWriteTx()->GetErrStr());
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,12 @@ class TPipeline : TNonCopyable {
return tx->RestoreTxData(Self, txc, ctx);
}

ERestoreDataStatus RestoreDataTx(
TWriteOperation* tx,
ERestoreDataStatus RestoreWriteTx(
TWriteOperation* writeOp,
TTransactionContext& txc
)
{
return tx->RestoreTxData(Self, txc);
return writeOp->RestoreTxData(Self, txc.DB);
}

void RegisterDistributedWrites(const TOperation::TPtr& op, NTable::TDatabase& db);
Expand Down
50 changes: 23 additions & 27 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
namespace NKikimr {
namespace NDataShard {

TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
: UserDb(*self, txc.DB, globalTxId, TRowVersion::Min(), TRowVersion::Max(), EngineHostCounters, TAppData::TimeProvider->Now())
, KeyValidator(*self, txc.DB)
TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
: KeyValidator(*self)
, TabletId(self->TabletID())
, IsImmediate(ev.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
, GlobalTxId(globalTxId)
, ReceivedAt(receivedAt)
, TxSize(0)
, ErrCode(NKikimrTxDataShard::TError::OK)
Expand All @@ -47,18 +48,13 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc,
ComputeTxSize();
NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Add(TxSize);

UserDb.SetIsWriteTx(true);

const NKikimrDataEvents::TEvWrite& record = ev.Record;

if (record.GetLockTxId()) {
UserDb.SetLockTxId(record.GetLockTxId());
UserDb.SetLockNodeId(record.GetLockNodeId());
LockTxId = record.GetLockTxId();
LockNodeId = record.GetLockNodeId();
}

if (record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
UserDb.SetIsImmediateTx(true);

NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;

LOG_T("Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << record.ShortDebugString());
Expand Down Expand Up @@ -218,12 +214,12 @@ void TValidatedWriteTx::SetTxKeys(const TUserTable& tableInfo)
}
}

ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors)
ui32 TValidatedWriteTx::ExtractKeys(const NTable::TScheme& scheme, bool allowErrors)
{
if (!HasOperations())
return 0;

bool isValid = ReValidateKeys();
bool isValid = ReValidateKeys(scheme);
if (allowErrors) {
if (!isValid) {
return 0;
Expand All @@ -235,11 +231,11 @@ ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors)
return KeysCount();
}

bool TValidatedWriteTx::ReValidateKeys()
bool TValidatedWriteTx::ReValidateKeys(const NTable::TScheme& scheme)
{
using EResult = NMiniKQL::IEngineFlat::EResult;

TKeyValidator::TValidateOptions options(UserDb);
TKeyValidator::TValidateOptions options(LockTxId, LockNodeId, false, IsImmediate, true, scheme);
auto [result, error] = GetKeyValidator().ValidateKeys(options);
if (result != EResult::Ok) {
ErrStr = std::move(error);
Expand Down Expand Up @@ -298,7 +294,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, ui64 tabletId)
TrackMemory();
}

TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self, TTransactionContext& txc)
TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self)
: TWriteOperation(op, self->TabletID())
{
SetTarget(ev->Sender);
Expand All @@ -310,7 +306,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::T
Orbit = std::move(evPtr->MoveOrbit());
WriteRequest.reset(evPtr.Release());

BuildWriteTx(self, txc);
BuildWriteTx(self);

TrackMemory();
}
Expand All @@ -329,7 +325,7 @@ void TWriteOperation::FillTxData(TValidatedWriteTx::TPtr writeTx)
WriteTx = writeTx;
}

void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, const TActorId& target, const TString& txBody, const TVector<TSysTables::TLocksTable::TLock>& locks, ui64 artifactFlags)
void TWriteOperation::FillTxData(TDataShard* self, const TActorId& target, const TString& txBody, const TVector<TSysTables::TLocksTable::TLock>& locks, ui64 artifactFlags)
{
UntrackMemory();

Expand All @@ -345,20 +341,20 @@ void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, con
}
ArtifactFlags = artifactFlags;
Y_ABORT_UNLESS(!WriteTx);
BuildWriteTx(self, txc);
BuildWriteTx(self);
Y_ABORT_UNLESS(WriteTx->Ready());

TrackMemory();
}

void TWriteOperation::FillVolatileTxData(TDataShard* self, TTransactionContext& txc)
void TWriteOperation::FillVolatileTxData(TDataShard* self)
{
UntrackMemory();

Y_ABORT_UNLESS(!WriteTx);
Y_ABORT_UNLESS(WriteRequest);

BuildWriteTx(self, txc);
BuildWriteTx(self);
Y_ABORT_UNLESS(WriteTx->Ready());


Expand Down Expand Up @@ -406,11 +402,11 @@ void TWriteOperation::ClearTxBody() {
TrackMemory();
}

TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransactionContext& txc)
TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self)
{
if (!WriteTx) {
Y_ABORT_UNLESS(WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetGlobalTxId(), GetReceivedAt(), *WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, GetGlobalTxId(), GetReceivedAt(), *WriteRequest);
}
return WriteTx;
}
Expand Down Expand Up @@ -478,7 +474,7 @@ ui64 TWriteOperation::GetMemoryConsumption() const {
return res;
}

ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransactionContext& txc)
ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, NTable::TDatabase& db)
{
if (!WriteTx) {
ReleasedTxDataSize = 0;
Expand All @@ -493,10 +489,10 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction

TVector<TSysTables::TLocksTable::TLock> locks;
if (!IsImmediate() && !HasVolatilePrepareFlag()) {
NIceDb::TNiceDb db(txc.DB);
NIceDb::TNiceDb niceDb(db);

TString txBody;
bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, txBody, locks, ArtifactFlags);
bool ok = self->TransQueue.LoadTxDetails(niceDb, GetTxId(), Target, txBody, locks, ArtifactFlags);
if (!ok) {
WriteRequest.reset();
ArtifactFlags = 0;
Expand All @@ -515,9 +511,9 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction

bool extractKeys = WriteTx->IsTxInfoLoaded();

WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetTxId(), GetReceivedAt(), *WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, GetTxId(), GetReceivedAt(), *WriteRequest);
if (WriteTx->Ready() && extractKeys) {
WriteTx->ExtractKeys(true);
WriteTx->ExtractKeys(db.GetScheme(), true);
}

if (!WriteTx->Ready()) {
Expand Down
Loading
Loading