Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move UserDb to TExecuteWriteUnit #2282

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ydb/core/tx/datashard/build_write_out_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransaction
return EExecutionStatus::Executed;
}

writeTx->SetReadVersion(DataShard.GetReadWriteVersions(writeOp).ReadVersion);

try {
const auto& kqpLocks = writeTx->GetKqpLocks() ? writeTx->GetKqpLocks().value() : NKikimrDataEvents::TKqpLocks{};
KqpFillOutReadSets(op->OutReadSets(), kqpLocks, true, nullptr, DataShard.SysLocksTable(), tabletId);
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,13 @@ class TDataShardEngineHost final
}

bool IsValidKey(TKeyDesc& key) const override {
TKeyValidator::TValidateOptions options(UserDb);
TKeyValidator::TValidateOptions options(
UserDb.GetLockTxId(),
UserDb.GetLockNodeId(),
UserDb.GetIsRepeatableSnapshot(),
UserDb.GetIsImmediateTx(),
UserDb.GetIsWriteTx()
);
return GetKeyValidator().IsValidKey(key, options);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ bool TValidatedDataTx::ReValidateKeys()
using EResult = NMiniKQL::IEngineFlat::EResult;

if (IsKqpTx()) {
TKeyValidator::TValidateOptions options(EngineBay.GetUserDb());
const auto& userDb = EngineBay.GetUserDb();
TKeyValidator::TValidateOptions options(userDb.GetLockTxId(), userDb.GetLockNodeId(), userDb.GetIsRepeatableSnapshot(), userDb.GetIsImmediateTx(), userDb.GetIsWriteTx());
auto [result, error] = EngineBay.GetKeyValidator().ValidateKeys(options);
if (result != EResult::Ok) {
ErrStr = std::move(error);
Expand Down
16 changes: 6 additions & 10 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ namespace NKikimr {
namespace NDataShard {

TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
: UserDb(*self, txc.DB, globalTxId, TRowVersion::Min(), TRowVersion::Max(), EngineHostCounters, TAppData::TimeProvider->Now())
, KeyValidator(*self, txc.DB)
: KeyValidator(*self, txc.DB)
, TabletId(self->TabletID())
, IsImmediate(ev.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
, GlobalTxId(globalTxId)
, ReceivedAt(receivedAt)
, TxSize(0)
, ErrCode(NKikimrTxDataShard::TError::OK)
Expand All @@ -47,18 +48,13 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc,
ComputeTxSize();
NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Add(TxSize);

UserDb.SetIsWriteTx(true);

const NKikimrDataEvents::TEvWrite& record = ev.Record;

if (record.GetLockTxId()) {
UserDb.SetLockTxId(record.GetLockTxId());
UserDb.SetLockNodeId(record.GetLockNodeId());
LockTxId = record.GetLockTxId();
LockNodeId = record.GetLockNodeId();
}

if (record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
UserDb.SetIsImmediateTx(true);

NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;

LOG_T("Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << record.ShortDebugString());
Expand Down Expand Up @@ -239,7 +235,7 @@ bool TValidatedWriteTx::ReValidateKeys()
{
using EResult = NMiniKQL::IEngineFlat::EResult;

TKeyValidator::TValidateOptions options(UserDb);
TKeyValidator::TValidateOptions options(LockTxId, LockNodeId, false, IsImmediate, true);
auto [result, error] = GetKeyValidator().ValidateKeys(options);
if (result != EResult::Ok) {
ErrStr = std::move(error);
Expand Down
64 changes: 10 additions & 54 deletions ydb/core/tx/datashard/datashard_write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,10 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
}

ui64 GetTxId() const override {
return UserDb.GetGlobalTxId();
return GlobalTxId;
}

ui64 LockTxId() const {
return UserDb.GetLockTxId();
}
ui32 LockNodeId() const {
return UserDb.GetLockNodeId();
}
bool Immediate() const {
return UserDb.GetIsImmediateTx();
}

bool NeedDiagnostics() const {
return true;
}
Expand All @@ -59,7 +51,7 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
return TxInfo().HasWrites();
}
bool HasLockedWrites() const {
return HasWrites() && LockTxId();
return HasWrites() && LockTxId;
}
bool HasDynamicWrites() const {
return TxInfo().DynKeysCount != 0;
Expand All @@ -72,48 +64,9 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
return KeyValidator;
}

TDataShardUserDb& GetUserDb() {
return UserDb;
}

const TDataShardUserDb& GetUserDb() const {
return UserDb;
}

bool CanCancel();
bool CheckCancelled();

void SetWriteVersion(TRowVersion writeVersion) {
UserDb.SetWriteVersion(writeVersion);
}
void SetReadVersion(TRowVersion readVersion) {
UserDb.SetReadVersion(readVersion);
}

void SetVolatileTxId(ui64 txId) {
UserDb.SetVolatileTxId(txId);
}

TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const {
return UserDb.GetCollectedChanges();
}
void ResetCollectedChanges() {
UserDb.ResetCollectedChanges();
}

TVector<ui64> GetVolatileCommitTxIds() const {
return UserDb.GetVolatileCommitTxIds();
}
const absl::flat_hash_set<ui64>& GetVolatileDependencies() const {
return UserDb.GetVolatileDependencies();
}
std::optional<ui64> GetVolatileChangeGroup() {
return UserDb.GetChangeGroup();
}
bool GetVolatileCommitOrdered() const {
return UserDb.GetVolatileCommitOrdered();
}

ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();

Expand Down Expand Up @@ -153,12 +106,15 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
void ComputeTxSize();

private:
TDataShardUserDb UserDb;
TKeyValidator KeyValidator;
NMiniKQL::TEngineHostCounters EngineHostCounters;

const ui64 TabletId;
const bool IsImmediate;

YDB_READONLY_DEF(ui64, LockTxId);
YDB_READONLY_DEF(ui32, LockNodeId);

YDB_READONLY_DEF(ui64, GlobalTxId);
YDB_READONLY_DEF(TTableId, TableId);
YDB_READONLY_DEF(std::optional<NKikimrDataEvents::TKqpLocks>, KqpLocks);
YDB_READONLY_DEF(std::vector<ui32>, ColumnIds);
Expand Down Expand Up @@ -267,11 +223,11 @@ class TWriteOperation : public TOperation {
}

ui64 LockTxId() const override {
return WriteTx ? WriteTx->LockTxId() : 0;
return WriteTx ? WriteTx->GetLockTxId() : 0;
}

ui32 LockNodeId() const override {
return WriteTx ? WriteTx->LockNodeId() : 0;
return WriteTx ? WriteTx->GetLockNodeId() : 0;
}

bool HasLockedWrites() const override {
Expand Down
55 changes: 29 additions & 26 deletions ydb/core/tx/datashard/execute_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ class TExecuteWriteUnit : public TExecutionUnit {
DataShard.SubscribeNewLocks(ctx);
}

EExecutionStatus OnTabletNotReady(TWriteOperation& writeOp, TValidatedWriteTx& writeTx, TTransactionContext& txc, const TActorContext& ctx)
EExecutionStatus OnTabletNotReady(TDataShardUserDb& userDb, TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx)
{
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID() << " is not ready for " << writeOp << " execution");

DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);

writeTx.ResetCollectedChanges();
userDb.ResetCollectedChanges();

writeOp.ReleaseTxData(txc);
return EExecutionStatus::Restart;
}

void DoUpdateToUserDb(TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) {
void DoUpdateToUserDb(TDataShardUserDb& userDb, TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) {
TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();

if (!writeTx->HasOperations()) {
Expand Down Expand Up @@ -122,7 +122,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeInfo));
}

writeTx->GetUserDb().UpdateRow(fullTableId, key, ops);
userDb.UpdateRow(fullTableId, key, ops);
}

DataShard.IncCounter(COUNTER_WRITE_ROWS, matrix.GetRowCount());
Expand Down Expand Up @@ -152,7 +152,6 @@ class TExecuteWriteUnit : public TExecutionUnit {
const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();

DataShard.ReleaseCache(*writeOp);
writeTx->GetUserDb().ResetCounters();

if (writeOp->IsTxDataReleased()) {
switch (Pipeline.RestoreDataTx(writeOp, txc)) {
Expand Down Expand Up @@ -194,8 +193,21 @@ class TExecuteWriteUnit : public TExecutionUnit {
return EExecutionStatus::Executed;
}

NMiniKQL::TEngineHostCounters engineHostCounters;
const ui64 txId = writeTx->GetTxId();
const auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(writeOp);

TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), readVersion, writeVersion, engineHostCounters, TAppData::TimeProvider->Now());
userDb.SetIsWriteTx(true);
userDb.SetIsImmediateTx(op->IsImmediate());
userDb.SetLockTxId(writeTx->GetLockTxId());
userDb.SetLockNodeId(writeTx->GetLockNodeId());

if (op->HasVolatilePrepareFlag()) {
userDb.SetVolatileTxId(txId);
}

try {
const ui64 txId = writeTx->GetTxId();
const auto* kqpLocks = writeTx->GetKqpLocks() ? &writeTx->GetKqpLocks().value() : nullptr;
const auto& inReadSets = op->InReadSets();
auto& awaitingDecisions = op->AwaitingDecisions();
Expand Down Expand Up @@ -273,18 +285,9 @@ class TExecuteWriteUnit : public TExecutionUnit {
return EExecutionStatus::Executed;
}

auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(writeOp);
writeTx->SetReadVersion(readVersion);
writeTx->SetWriteVersion(writeVersion);

if (op->HasVolatilePrepareFlag()) {
writeTx->SetVolatileTxId(txId);
}


KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, writeTx->GetUserDb());
KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, userDb);

DoUpdateToUserDb(writeOp, txc, ctx);
DoUpdateToUserDb(userDb, writeOp, txc, ctx);

writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCompleted(DataShard.TabletID(), writeOp->GetTxId()));

Expand All @@ -294,7 +297,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
writeResult->Record.SetStep(op->GetStep());

if (Pipeline.AddLockDependencies(op, guardLocks)) {
writeTx->ResetCollectedChanges();
userDb.ResetCollectedChanges();
writeOp->ReleaseTxData(txc);
if (txc.DB.HasChanges()) {
txc.DB.RollbackChanges();
Expand All @@ -304,17 +307,17 @@ class TExecuteWriteUnit : public TExecutionUnit {

// Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies
// Such transactions would have no participants and become immediately committed
auto commitTxIds = writeTx->GetVolatileCommitTxIds();
auto commitTxIds = userDb.GetVolatileCommitTxIds();
if (commitTxIds) {
TVector<ui64> participants(awaitingDecisions.begin(), awaitingDecisions.end());
DataShard.GetVolatileTxManager().PersistAddVolatileTx(
txId,
writeVersion,
commitTxIds,
writeTx->GetVolatileDependencies(),
userDb.GetVolatileDependencies(),
participants,
writeTx->GetVolatileChangeGroup(),
writeTx->GetVolatileCommitOrdered(),
userDb.GetChangeGroup(),
userDb.GetVolatileCommitOrdered(),
txc
);
}
Expand All @@ -335,11 +338,11 @@ class TExecuteWriteUnit : public TExecutionUnit {
// Note: may erase persistent locks, must be after we persist volatile tx
AddLocksToResult(writeOp, ctx);

if (auto changes = std::move(writeTx->GetCollectedChanges())) {
if (auto changes = std::move(userDb.GetCollectedChanges())) {
op->ChangeRecords() = std::move(changes);
}

auto& counters = writeTx->GetUserDb().GetCounters();
const auto& counters = userDb.GetCounters();
KqpUpdateDataShardStatCounters(DataShard, counters);
KqpFillTxStats(DataShard, counters, *writeResult->Record.MutableTxStats());

Expand All @@ -359,9 +362,9 @@ class TExecuteWriteUnit : public TExecutionUnit {
}
return EExecutionStatus::Continue;
} catch (const TNotReadyTabletException&) {
return OnTabletNotReady(*writeOp, *writeTx, txc, ctx);
return OnTabletNotReady(userDb, *writeOp, txc, ctx);
} catch (const TLockedWriteLimitException&) {
writeTx->ResetCollectedChanges();
userDb.ResetCollectedChanges();

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Shard " << DataShard.TabletID() << " cannot write more uncommitted changes");

Expand Down
17 changes: 11 additions & 6 deletions ydb/core/tx/datashard/key_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& ra
Info.SetLoaded();
}

TKeyValidator::TValidateOptions::TValidateOptions(const TDataShardUserDb& userDb)
: IsLockTxId(static_cast<bool>(userDb.GetLockTxId()))
, IsLockNodeId(static_cast<bool>(userDb.GetLockNodeId()))
, IsRepeatableSnapshot(userDb.GetIsRepeatableSnapshot())
, IsImmediateTx(userDb.GetIsImmediateTx())
, IsWriteTx(userDb.GetIsWriteTx())
TKeyValidator::TValidateOptions::TValidateOptions(
ui64 LockTxId,
ui32 LockNodeId,
bool isRepeatableSnapshot,
bool isImmediateTx,
bool isWriteTx)
: IsLockTxId(static_cast<bool>(LockTxId))
, IsLockNodeId(static_cast<bool>(LockNodeId))
, IsRepeatableSnapshot(isRepeatableSnapshot)
, IsImmediateTx(isImmediateTx)
, IsWriteTx(isWriteTx)
{
}

Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/datashard/key_validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ class TKeyValidator {
bool IsRepeatableSnapshot;
bool IsImmediateTx;
bool IsWriteTx;

TValidateOptions(const TDataShardUserDb& userDb);

TValidateOptions(ui64 LockTxId,
ui32 LockNodeId,
bool isRepeatableSnapshot,
bool isImmediateTx,
bool isWriteTx);
};

bool IsValidKey(TKeyDesc& key, const TValidateOptions& options) const;
Expand Down
Loading