Skip to content

Commit

Permalink
Merge e0805ca into 5b9ad59
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Feb 28, 2024
2 parents 5b9ad59 + e0805ca commit 40d870f
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 156 deletions.
4 changes: 1 addition & 3 deletions ydb/core/tx/datashard/build_write_out_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransaction
DataShard.ReleaseCache(*writeOp);

if (writeOp->IsTxDataReleased()) {
switch (Pipeline.RestoreDataTx(writeOp, txc)) {
switch (Pipeline.RestoreWriteTx(writeOp, txc)) {
case ERestoreDataStatus::Ok:
break;
case ERestoreDataStatus::Restart:
Expand All @@ -72,8 +72,6 @@ EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransaction
return EExecutionStatus::Executed;
}

writeTx->SetReadVersion(DataShard.GetReadWriteVersions(writeOp).ReadVersion);

try {
const auto& kqpLocks = writeTx->GetKqpLocks() ? writeTx->GetKqpLocks().value() : NKikimrDataEvents::TKqpLocks{};
KqpFillOutReadSets(op->OutReadSets(), kqpLocks, true, nullptr, DataShard.SysLocksTable(), tabletId);
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,14 @@ class TDataShardEngineHost final
}

bool IsValidKey(TKeyDesc& key) const override {
TKeyValidator::TValidateOptions options(UserDb);
TKeyValidator::TValidateOptions options(
UserDb.GetLockTxId(),
UserDb.GetLockNodeId(),
UserDb.GetIsRepeatableSnapshot(),
UserDb.GetIsImmediateTx(),
UserDb.GetIsWriteTx(),
Scheme
);
return GetKeyValidator().IsValidKey(key, options);
}

Expand Down Expand Up @@ -469,7 +476,7 @@ class TDataShardEngineHost final

TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId)
: StepTxId(stepTxId)
, KeyValidator(*self, txc.DB)
, KeyValidator(*self)
{
auto now = TAppData::TimeProvider->Now();
EngineHost = MakeHolder<TDataShardEngineHost>(self, *this, txc.DB, stepTxId.TxId, EngineHostCounters, now);
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,13 @@ ui32 TValidatedDataTx::ExtractKeys(bool allowErrors)
return KeysCount();
}

bool TValidatedDataTx::ReValidateKeys()
bool TValidatedDataTx::ReValidateKeys(const NTable::TScheme& scheme)
{
using EResult = NMiniKQL::IEngineFlat::EResult;

if (IsKqpTx()) {
TKeyValidator::TValidateOptions options(EngineBay.GetUserDb());
const auto& userDb = EngineBay.GetUserDb();
TKeyValidator::TValidateOptions options(userDb.GetLockTxId(), userDb.GetLockNodeId(), userDb.GetIsRepeatableSnapshot(), userDb.GetIsImmediateTx(), userDb.GetIsWriteTx(), scheme);
auto [result, error] = EngineBay.GetKeyValidator().ValidateKeys(options);
if (result != EResult::Ok) {
ErrStr = std::move(error);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_active_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TValidatedDataTx : TNonCopyable, public TValidatedTx {
const NKikimrTxDataShard::TReadTableTransaction &GetReadTableTransaction() const { return Tx.GetReadTableTransaction(); }

ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();
bool ReValidateKeys(const NTable::TScheme& scheme);

ui64 GetTxSize() const { return TxSize; }
ui32 KeysCount() const { return TxInfo().ReadsCount + TxInfo().WritesCount; }
Expand Down Expand Up @@ -468,9 +468,9 @@ class TActiveTransaction : public TOperation {
return 0;
}

bool ReValidateKeys() {
bool ReValidateKeys(const NTable::TScheme& scheme) {
if (DataTx && (DataTx->ProgramSize() || DataTx->IsKqpDataTx()))
return DataTx->ReValidateKeys();
return DataTx->ReValidateKeys(scheme);
return true;
}

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,10 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext&
} else if (writeOp->HasVolatilePrepareFlag()) {
// Since transaction is volatile it was never stored on disk, and it
// shouldn't have any artifacts yet.
writeOp->FillVolatileTxData(Self, txc);
writeOp->FillVolatileTxData(Self);

ui32 keysCount = 0;
keysCount = writeOp->ExtractKeys();
keysCount = writeOp->ExtractKeys(txc.DB.GetScheme());

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "LoadWriteDetails at " << Self->TabletID() << " loaded writeOp from memory " << writeOp->GetStep() << ":" << writeOp->GetTxId() << " keys extracted: " << keysCount);
} else {
Expand All @@ -632,11 +632,11 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext&
if (MaybeRequestMoreTxMemory(requiredMem, txc))
return false;

writeOp->FillTxData(Self, txc, target, txBody, std::move(locks), artifactFlags);
writeOp->FillTxData(Self, target, txBody, std::move(locks), artifactFlags);

ui32 keysCount = 0;
//if (Config.LimitActiveTx > 1)
keysCount = writeOp->ExtractKeys();
keysCount = writeOp->ExtractKeys(txc.DB.GetScheme());

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "LoadWriteDetails at " << Self->TabletID() << " loaded writeOp from db " << writeOp->GetStep() << ":" << writeOp->GetTxId() << " keys extracted: " << keysCount);
}
Expand Down Expand Up @@ -1633,7 +1633,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
{
const auto& rec = ev->Get()->Record;
TBasicOpInfo info(rec.GetTxId(), EOperationKind::WriteTx, NEvWrite::TConvertor::GetProposeFlags(rec.GetTxMode()), 0, receivedAt, tieBreakerIndex);
auto writeOp = MakeIntrusive<TWriteOperation>(info, std::move(ev), Self, txc);
auto writeOp = MakeIntrusive<TWriteOperation>(info, std::move(ev), Self);
writeOp->OperationSpan = std::move(operationSpan);
auto writeTx = writeOp->GetWriteTx();
Y_ABORT_UNLESS(writeTx);
Expand All @@ -1648,7 +1648,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
return writeOp;
}

writeTx->ExtractKeys(true);
writeTx->ExtractKeys(txc.DB.GetScheme(), true);

if (!writeTx->Ready()) {
badRequest(NEvWrite::TConvertor::ConvertErrCode(writeOp->GetWriteTx()->GetErrCode()), TStringBuilder() << "Cannot parse tx keys " << writeOp->GetTxId() << ". " << writeOp->GetWriteTx()->GetErrCode() << ": " << writeOp->GetWriteTx()->GetErrStr());
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,12 @@ class TPipeline : TNonCopyable {
return tx->RestoreTxData(Self, txc, ctx);
}

ERestoreDataStatus RestoreDataTx(
TWriteOperation* tx,
ERestoreDataStatus RestoreWriteTx(
TWriteOperation* writeOp,
TTransactionContext& txc
)
{
return tx->RestoreTxData(Self, txc);
return writeOp->RestoreTxData(Self, txc.DB);
}

void RegisterDistributedWrites(const TOperation::TPtr& op, NTable::TDatabase& db);
Expand Down
50 changes: 23 additions & 27 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
namespace NKikimr {
namespace NDataShard {

TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
: UserDb(*self, txc.DB, globalTxId, TRowVersion::Min(), TRowVersion::Max(), EngineHostCounters, TAppData::TimeProvider->Now())
, KeyValidator(*self, txc.DB)
TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
: KeyValidator(*self)
, TabletId(self->TabletID())
, IsImmediate(ev.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
, GlobalTxId(globalTxId)
, ReceivedAt(receivedAt)
, TxSize(0)
, ErrCode(NKikimrTxDataShard::TError::OK)
Expand All @@ -47,18 +48,13 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc,
ComputeTxSize();
NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Add(TxSize);

UserDb.SetIsWriteTx(true);

const NKikimrDataEvents::TEvWrite& record = ev.Record;

if (record.GetLockTxId()) {
UserDb.SetLockTxId(record.GetLockTxId());
UserDb.SetLockNodeId(record.GetLockNodeId());
LockTxId = record.GetLockTxId();
LockNodeId = record.GetLockNodeId();
}

if (record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
UserDb.SetIsImmediateTx(true);

NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;

LOG_T("Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << record.ShortDebugString());
Expand Down Expand Up @@ -218,12 +214,12 @@ void TValidatedWriteTx::SetTxKeys(const TUserTable& tableInfo)
}
}

ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors)
ui32 TValidatedWriteTx::ExtractKeys(const NTable::TScheme& scheme, bool allowErrors)
{
if (!HasOperations())
return 0;

bool isValid = ReValidateKeys();
bool isValid = ReValidateKeys(scheme);
if (allowErrors) {
if (!isValid) {
return 0;
Expand All @@ -235,11 +231,11 @@ ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors)
return KeysCount();
}

bool TValidatedWriteTx::ReValidateKeys()
bool TValidatedWriteTx::ReValidateKeys(const NTable::TScheme& scheme)
{
using EResult = NMiniKQL::IEngineFlat::EResult;

TKeyValidator::TValidateOptions options(UserDb);
TKeyValidator::TValidateOptions options(LockTxId, LockNodeId, false, IsImmediate, true, scheme);
auto [result, error] = GetKeyValidator().ValidateKeys(options);
if (result != EResult::Ok) {
ErrStr = std::move(error);
Expand Down Expand Up @@ -298,7 +294,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, ui64 tabletId)
TrackMemory();
}

TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self, TTransactionContext& txc)
TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self)
: TWriteOperation(op, self->TabletID())
{
SetTarget(ev->Sender);
Expand All @@ -310,7 +306,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::T
Orbit = std::move(evPtr->MoveOrbit());
WriteRequest.reset(evPtr.Release());

BuildWriteTx(self, txc);
BuildWriteTx(self);

TrackMemory();
}
Expand All @@ -329,7 +325,7 @@ void TWriteOperation::FillTxData(TValidatedWriteTx::TPtr writeTx)
WriteTx = writeTx;
}

void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, const TActorId& target, const TString& txBody, const TVector<TSysTables::TLocksTable::TLock>& locks, ui64 artifactFlags)
void TWriteOperation::FillTxData(TDataShard* self, const TActorId& target, const TString& txBody, const TVector<TSysTables::TLocksTable::TLock>& locks, ui64 artifactFlags)
{
UntrackMemory();

Expand All @@ -345,20 +341,20 @@ void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, con
}
ArtifactFlags = artifactFlags;
Y_ABORT_UNLESS(!WriteTx);
BuildWriteTx(self, txc);
BuildWriteTx(self);
Y_ABORT_UNLESS(WriteTx->Ready());

TrackMemory();
}

void TWriteOperation::FillVolatileTxData(TDataShard* self, TTransactionContext& txc)
void TWriteOperation::FillVolatileTxData(TDataShard* self)
{
UntrackMemory();

Y_ABORT_UNLESS(!WriteTx);
Y_ABORT_UNLESS(WriteRequest);

BuildWriteTx(self, txc);
BuildWriteTx(self);
Y_ABORT_UNLESS(WriteTx->Ready());


Expand Down Expand Up @@ -406,11 +402,11 @@ void TWriteOperation::ClearTxBody() {
TrackMemory();
}

TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransactionContext& txc)
TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self)
{
if (!WriteTx) {
Y_ABORT_UNLESS(WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetGlobalTxId(), GetReceivedAt(), *WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, GetGlobalTxId(), GetReceivedAt(), *WriteRequest);
}
return WriteTx;
}
Expand Down Expand Up @@ -478,7 +474,7 @@ ui64 TWriteOperation::GetMemoryConsumption() const {
return res;
}

ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransactionContext& txc)
ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, NTable::TDatabase& db)
{
if (!WriteTx) {
ReleasedTxDataSize = 0;
Expand All @@ -493,10 +489,10 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction

TVector<TSysTables::TLocksTable::TLock> locks;
if (!IsImmediate() && !HasVolatilePrepareFlag()) {
NIceDb::TNiceDb db(txc.DB);
NIceDb::TNiceDb niceDb(db);

TString txBody;
bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, txBody, locks, ArtifactFlags);
bool ok = self->TransQueue.LoadTxDetails(niceDb, GetTxId(), Target, txBody, locks, ArtifactFlags);
if (!ok) {
WriteRequest.reset();
ArtifactFlags = 0;
Expand All @@ -515,9 +511,9 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction

bool extractKeys = WriteTx->IsTxInfoLoaded();

WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetTxId(), GetReceivedAt(), *WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, GetTxId(), GetReceivedAt(), *WriteRequest);
if (WriteTx->Ready() && extractKeys) {
WriteTx->ExtractKeys(true);
WriteTx->ExtractKeys(db.GetScheme(), true);
}

if (!WriteTx->Ready()) {
Expand Down
Loading

0 comments on commit 40d870f

Please sign in to comment.