Skip to content

Commit

Permalink
Merge 3fffcc7 into 1f9cb6b
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Feb 27, 2024
2 parents 1f9cb6b + 3fffcc7 commit 963e83d
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 102 deletions.
2 changes: 0 additions & 2 deletions ydb/core/tx/datashard/build_write_out_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ EExecutionStatus TBuildWriteOutRSUnit::Execute(TOperation::TPtr op, TTransaction
return EExecutionStatus::Executed;
}

writeTx->SetReadVersion(DataShard.GetReadWriteVersions(writeOp).ReadVersion);

try {
const auto& kqpLocks = writeTx->GetKqpLocks() ? writeTx->GetKqpLocks().value() : NKikimrDataEvents::TKqpLocks{};
KqpFillOutReadSets(op->OutReadSets(), kqpLocks, true, nullptr, DataShard.SysLocksTable(), tabletId);
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,13 @@ class TDataShardEngineHost final
}

bool IsValidKey(TKeyDesc& key) const override {
TKeyValidator::TValidateOptions options(UserDb);
TKeyValidator::TValidateOptions options(
UserDb.GetLockTxId(),
UserDb.GetLockNodeId(),
UserDb.GetIsRepeatableSnapshot(),
UserDb.GetIsImmediateTx(),
UserDb.GetIsWriteTx()
);
return GetKeyValidator().IsValidKey(key, options);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ bool TValidatedDataTx::ReValidateKeys()
using EResult = NMiniKQL::IEngineFlat::EResult;

if (IsKqpTx()) {
TKeyValidator::TValidateOptions options(EngineBay.GetUserDb());
const auto& userDb = EngineBay.GetUserDb();
TKeyValidator::TValidateOptions options(userDb.GetLockTxId(), userDb.GetLockNodeId(), userDb.GetIsRepeatableSnapshot(), userDb.GetIsImmediateTx(), userDb.GetIsWriteTx());
auto [result, error] = EngineBay.GetKeyValidator().ValidateKeys(options);
if (result != EResult::Ok) {
ErrStr = std::move(error);
Expand Down
16 changes: 6 additions & 10 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ namespace NKikimr {
namespace NDataShard {

TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
: UserDb(*self, txc.DB, globalTxId, TRowVersion::Min(), TRowVersion::Max(), EngineHostCounters, TAppData::TimeProvider->Now())
, KeyValidator(*self, txc.DB)
: KeyValidator(*self, txc.DB)
, TabletId(self->TabletID())
, IsImmediate(ev.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
, GlobalTxId(globalTxId)
, ReceivedAt(receivedAt)
, TxSize(0)
, ErrCode(NKikimrTxDataShard::TError::OK)
Expand All @@ -47,18 +48,13 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc,
ComputeTxSize();
NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Add(TxSize);

UserDb.SetIsWriteTx(true);

const NKikimrDataEvents::TEvWrite& record = ev.Record;

if (record.GetLockTxId()) {
UserDb.SetLockTxId(record.GetLockTxId());
UserDb.SetLockNodeId(record.GetLockNodeId());
LockTxId = record.GetLockTxId();
LockNodeId = record.GetLockNodeId();
}

if (record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
UserDb.SetIsImmediateTx(true);

NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;

LOG_T("Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << record.ShortDebugString());
Expand Down Expand Up @@ -239,7 +235,7 @@ bool TValidatedWriteTx::ReValidateKeys()
{
using EResult = NMiniKQL::IEngineFlat::EResult;

TKeyValidator::TValidateOptions options(UserDb);
TKeyValidator::TValidateOptions options(LockTxId, LockNodeId, false, IsImmediate, true);
auto [result, error] = GetKeyValidator().ValidateKeys(options);
if (result != EResult::Ok) {
ErrStr = std::move(error);
Expand Down
64 changes: 10 additions & 54 deletions ydb/core/tx/datashard/datashard_write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,10 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
}

ui64 GetTxId() const override {
return UserDb.GetGlobalTxId();
return GlobalTxId;
}

ui64 LockTxId() const {
return UserDb.GetLockTxId();
}
ui32 LockNodeId() const {
return UserDb.GetLockNodeId();
}
bool Immediate() const {
return UserDb.GetIsImmediateTx();
}

bool NeedDiagnostics() const {
return true;
}
Expand All @@ -59,7 +51,7 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
return TxInfo().HasWrites();
}
bool HasLockedWrites() const {
return HasWrites() && LockTxId();
return HasWrites() && LockTxId;
}
bool HasDynamicWrites() const {
return TxInfo().DynKeysCount != 0;
Expand All @@ -72,48 +64,9 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
return KeyValidator;
}

TDataShardUserDb& GetUserDb() {
return UserDb;
}

const TDataShardUserDb& GetUserDb() const {
return UserDb;
}

bool CanCancel();
bool CheckCancelled();

void SetWriteVersion(TRowVersion writeVersion) {
UserDb.SetWriteVersion(writeVersion);
}
void SetReadVersion(TRowVersion readVersion) {
UserDb.SetReadVersion(readVersion);
}

void SetVolatileTxId(ui64 txId) {
UserDb.SetVolatileTxId(txId);
}

TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const {
return UserDb.GetCollectedChanges();
}
void ResetCollectedChanges() {
UserDb.ResetCollectedChanges();
}

TVector<ui64> GetVolatileCommitTxIds() const {
return UserDb.GetVolatileCommitTxIds();
}
const absl::flat_hash_set<ui64>& GetVolatileDependencies() const {
return UserDb.GetVolatileDependencies();
}
std::optional<ui64> GetVolatileChangeGroup() {
return UserDb.GetChangeGroup();
}
bool GetVolatileCommitOrdered() const {
return UserDb.GetVolatileCommitOrdered();
}

ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();

Expand Down Expand Up @@ -153,12 +106,15 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
void ComputeTxSize();

private:
TDataShardUserDb UserDb;
TKeyValidator KeyValidator;
NMiniKQL::TEngineHostCounters EngineHostCounters;

const ui64 TabletId;
const bool IsImmediate;

YDB_READONLY_DEF(ui64, LockTxId);
YDB_READONLY_DEF(ui32, LockNodeId);

YDB_READONLY_DEF(ui64, GlobalTxId);
YDB_READONLY_DEF(TTableId, TableId);
YDB_READONLY_DEF(std::optional<NKikimrDataEvents::TKqpLocks>, KqpLocks);
YDB_READONLY_DEF(std::vector<ui32>, ColumnIds);
Expand Down Expand Up @@ -267,11 +223,11 @@ class TWriteOperation : public TOperation {
}

ui64 LockTxId() const override {
return WriteTx ? WriteTx->LockTxId() : 0;
return WriteTx ? WriteTx->GetLockTxId() : 0;
}

ui32 LockNodeId() const override {
return WriteTx ? WriteTx->LockNodeId() : 0;
return WriteTx ? WriteTx->GetLockNodeId() : 0;
}

bool HasLockedWrites() const override {
Expand Down
55 changes: 29 additions & 26 deletions ydb/core/tx/datashard/execute_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ class TExecuteWriteUnit : public TExecutionUnit {
DataShard.SubscribeNewLocks(ctx);
}

EExecutionStatus OnTabletNotReady(TWriteOperation& writeOp, TValidatedWriteTx& writeTx, TTransactionContext& txc, const TActorContext& ctx)
EExecutionStatus OnTabletNotReady(TDataShardUserDb& userDb, TWriteOperation& writeOp, TTransactionContext& txc, const TActorContext& ctx)
{
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID() << " is not ready for " << writeOp << " execution");

DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);

writeTx.ResetCollectedChanges();
userDb.ResetCollectedChanges();

writeOp.ReleaseTxData(txc);
return EExecutionStatus::Restart;
}

void DoUpdateToUserDb(TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) {
void DoUpdateToUserDb(TDataShardUserDb& userDb, TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) {
TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();

if (!writeTx->HasOperations()) {
Expand Down Expand Up @@ -122,7 +122,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeInfo));
}

writeTx->GetUserDb().UpdateRow(fullTableId, key, ops);
userDb.UpdateRow(fullTableId, key, ops);
}

DataShard.IncCounter(COUNTER_WRITE_ROWS, matrix.GetRowCount());
Expand Down Expand Up @@ -152,7 +152,6 @@ class TExecuteWriteUnit : public TExecutionUnit {
const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();

DataShard.ReleaseCache(*writeOp);
writeTx->GetUserDb().ResetCounters();

if (writeOp->IsTxDataReleased()) {
switch (Pipeline.RestoreDataTx(writeOp, txc)) {
Expand Down Expand Up @@ -194,8 +193,21 @@ class TExecuteWriteUnit : public TExecutionUnit {
return EExecutionStatus::Executed;
}

NMiniKQL::TEngineHostCounters engineHostCounters;
const ui64 txId = writeTx->GetTxId();
const auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(writeOp);

TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), readVersion, writeVersion, engineHostCounters, TAppData::TimeProvider->Now());
userDb.SetIsWriteTx(true);
userDb.SetIsImmediateTx(op->IsImmediate());
userDb.SetLockTxId(writeTx->GetLockTxId());
userDb.SetLockNodeId(writeTx->GetLockNodeId());

if (op->HasVolatilePrepareFlag()) {
userDb.SetVolatileTxId(txId);
}

try {
const ui64 txId = writeTx->GetTxId();
const auto* kqpLocks = writeTx->GetKqpLocks() ? &writeTx->GetKqpLocks().value() : nullptr;
const auto& inReadSets = op->InReadSets();
auto& awaitingDecisions = op->AwaitingDecisions();
Expand Down Expand Up @@ -273,18 +285,9 @@ class TExecuteWriteUnit : public TExecutionUnit {
return EExecutionStatus::Executed;
}

auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(writeOp);
writeTx->SetReadVersion(readVersion);
writeTx->SetWriteVersion(writeVersion);

if (op->HasVolatilePrepareFlag()) {
writeTx->SetVolatileTxId(txId);
}


KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, writeTx->GetUserDb());
KqpCommitLocks(tabletId, kqpLocks, sysLocks, writeVersion, userDb);

DoUpdateToUserDb(writeOp, txc, ctx);
DoUpdateToUserDb(userDb, writeOp, txc, ctx);

writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCompleted(DataShard.TabletID(), writeOp->GetTxId()));

Expand All @@ -294,7 +297,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
writeResult->Record.SetStep(op->GetStep());

if (Pipeline.AddLockDependencies(op, guardLocks)) {
writeTx->ResetCollectedChanges();
userDb.ResetCollectedChanges();
writeOp->ReleaseTxData(txc);
if (txc.DB.HasChanges()) {
txc.DB.RollbackChanges();
Expand All @@ -304,17 +307,17 @@ class TExecuteWriteUnit : public TExecutionUnit {

// Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies
// Such transactions would have no participants and become immediately committed
auto commitTxIds = writeTx->GetVolatileCommitTxIds();
auto commitTxIds = userDb.GetVolatileCommitTxIds();
if (commitTxIds) {
TVector<ui64> participants(awaitingDecisions.begin(), awaitingDecisions.end());
DataShard.GetVolatileTxManager().PersistAddVolatileTx(
txId,
writeVersion,
commitTxIds,
writeTx->GetVolatileDependencies(),
userDb.GetVolatileDependencies(),
participants,
writeTx->GetVolatileChangeGroup(),
writeTx->GetVolatileCommitOrdered(),
userDb.GetChangeGroup(),
userDb.GetVolatileCommitOrdered(),
txc
);
}
Expand All @@ -335,11 +338,11 @@ class TExecuteWriteUnit : public TExecutionUnit {
// Note: may erase persistent locks, must be after we persist volatile tx
AddLocksToResult(writeOp, ctx);

if (auto changes = std::move(writeTx->GetCollectedChanges())) {
if (auto changes = std::move(userDb.GetCollectedChanges())) {
op->ChangeRecords() = std::move(changes);
}

auto& counters = writeTx->GetUserDb().GetCounters();
const auto& counters = userDb.GetCounters();
KqpUpdateDataShardStatCounters(DataShard, counters);
KqpFillTxStats(DataShard, counters, *writeResult->Record.MutableTxStats());

Expand All @@ -359,9 +362,9 @@ class TExecuteWriteUnit : public TExecutionUnit {
}
return EExecutionStatus::Continue;
} catch (const TNotReadyTabletException&) {
return OnTabletNotReady(*writeOp, *writeTx, txc, ctx);
return OnTabletNotReady(userDb, *writeOp, txc, ctx);
} catch (const TLockedWriteLimitException&) {
writeTx->ResetCollectedChanges();
userDb.ResetCollectedChanges();

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Shard " << DataShard.TabletID() << " cannot write more uncommitted changes");

Expand Down
17 changes: 11 additions & 6 deletions ydb/core/tx/datashard/key_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& ra
Info.SetLoaded();
}

TKeyValidator::TValidateOptions::TValidateOptions(const TDataShardUserDb& userDb)
: IsLockTxId(static_cast<bool>(userDb.GetLockTxId()))
, IsLockNodeId(static_cast<bool>(userDb.GetLockNodeId()))
, IsRepeatableSnapshot(userDb.GetIsRepeatableSnapshot())
, IsImmediateTx(userDb.GetIsImmediateTx())
, IsWriteTx(userDb.GetIsWriteTx())
TKeyValidator::TValidateOptions::TValidateOptions(
ui64 LockTxId,
ui32 LockNodeId,
bool isRepeatableSnapshot,
bool isImmediateTx,
bool isWriteTx)
: IsLockTxId(static_cast<bool>(LockTxId))
, IsLockNodeId(static_cast<bool>(LockNodeId))
, IsRepeatableSnapshot(isRepeatableSnapshot)
, IsImmediateTx(isImmediateTx)
, IsWriteTx(isWriteTx)
{
}

Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/datashard/key_validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ class TKeyValidator {
bool IsRepeatableSnapshot;
bool IsImmediateTx;
bool IsWriteTx;

TValidateOptions(const TDataShardUserDb& userDb);

TValidateOptions(ui64 LockTxId,
ui32 LockNodeId,
bool isRepeatableSnapshot,
bool isImmediateTx,
bool isWriteTx);
};

bool IsValidKey(TKeyDesc& key, const TValidateOptions& options) const;
Expand Down

0 comments on commit 963e83d

Please sign in to comment.