From a5d4df7367ec1054af3e0cd0e75d492001e73b5e Mon Sep 17 00:00:00 2001 From: ulya-sidorina Date: Fri, 3 Jan 2025 19:58:20 +0100 Subject: [PATCH] feat(data_integrity_trails): add logging of acquired locks --- .../kqp/common/kqp_data_integrity_trails.h | 97 ++++++++++++++++++- .../kqp/executer_actor/kqp_data_executer.cpp | 14 ++- .../kqp_data_integrity_trails_ut.cpp | 47 +-------- .../check_distributed_erase_tx_unit.cpp | 2 +- ydb/core/tx/datashard/datashard.cpp | 18 ++-- .../tx/datashard/datashard_common_upload.cpp | 8 +- .../tx/datashard/datashard_common_upload.h | 2 +- .../tx/datashard/datashard_direct_erase.cpp | 16 +-- .../tx/datashard/datashard_direct_erase.h | 6 +- .../datashard_direct_transaction.cpp | 4 +- .../datashard/datashard_direct_transaction.h | 4 +- .../tx/datashard/datashard_direct_upload.cpp | 4 +- .../tx/datashard/datashard_direct_upload.h | 2 +- ydb/core/tx/datashard/datashard_impl.h | 6 +- .../tx/datashard/datashard_integrity_trails.h | 26 +++++ ydb/core/tx/datashard/datashard_kqp.cpp | 5 +- .../tx/datashard/datashard_repl_apply.cpp | 2 +- .../tx/datashard/datashard_s3_upload_rows.cpp | 4 +- ydb/core/tx/datashard/datashard_user_db.cpp | 5 +- ydb/core/tx/datashard/direct_tx_unit.cpp | 2 +- .../execute_distributed_erase_tx_unit.cpp | 12 ++- ydb/core/tx/locks/locks.cpp | 14 ++- ydb/core/tx/locks/locks.h | 4 +- 23 files changed, 206 insertions(+), 98 deletions(-) diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h index 71198204a55b..9278577dde45 100644 --- a/ydb/core/kqp/common/kqp_data_integrity_trails.h +++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h @@ -6,6 +6,8 @@ #include #include +#include +#include namespace NKikimr { namespace NDataIntegrity { @@ -97,23 +99,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction } // DataExecuter -inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { - auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) { +inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) { TStringStream ss; LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Request", ss); LogKeyValue("TraceId", traceId, ss); LogKeyValue("PhyTxId", ToString(txId), ss); + LogKeyValue("Locks", "[" + txLocksDebugStr + "]", ss); if (shardId) { LogKeyValue("ShardId", ToString(*shardId), ss); } - LogKeyValue("Type", type, ss, /*last*/ true); + LogKeyValue("TxType", type, ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss); + + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetIssues(), issues); + LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss); + LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +template +inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) { + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", type, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : info.GetLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); return ss.Str(); }; - LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId)); + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info)); } // WriteActor,BufferActor diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 410ecd751fac..7eda49dc7294 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -228,6 +228,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrTxDataShard::TEvKqpInputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -246,6 +247,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -523,6 +525,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -592,6 +595,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1122,7 +1126,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext()); LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards"); Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true)); @@ -1263,6 +1267,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1334,6 +1339,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -1824,7 +1830,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); ResponseEv->Orbit.Fork(evData->Orbit); ev = std::move(evData); @@ -1860,7 +1867,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); auto shardsToString = [](const auto& shards) { TStringBuilder builder; diff --git a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp index f8d362bb530e..86afdd55b2c3 100644 --- a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp +++ b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp @@ -43,7 +43,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { } // check executer logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 1 : 0); + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0); // check session actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0); // check grpc logs @@ -203,8 +203,8 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - // check executer logs (should be empty, because executer only logs modification operations) - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0); + // check executer logs (should be 1, because executer only logs result for read actor) + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1); // check session actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2); // check grpc logs @@ -212,47 +212,6 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { // check datashard logs (should be empty, because DataShard only logs modification operations) UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0); } - - Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) { - TStringStream ss; - { - TKikimrSettings serverSettings; - serverSettings.LogStream = &ss; - TKikimrRunner kikimr(serverSettings); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE); - NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); - - - const auto query = R"( - --!syntax_v1 - - UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES - (3u, "Value3"), - (101u, "Value101"), - (201u, "Value201"); - )"; - - if (Streaming) { - auto result = client.StreamExecuteYqlScript(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - CollectStreamResult(result); - } else { - auto result = client.ExecuteYqlScript(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } - } - - // check executer logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1); - // check session actor logs (should contain double logs because this query was executed via worker actor) - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4); - // check grpc logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2); - // check datashard logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2); - - Cout << ss.Str() << Endl; - } } } // namespace NKqp diff --git a/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp index ab41e0757ddf..d819c534255e 100644 --- a/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp @@ -48,7 +48,7 @@ class TCheckDistributedEraseTxUnit : public TExecutionUnit { NKikimrTxDataShard::TEvEraseRowsResponse::EStatus status; TString error; - if (!TDirectTxErase::CheckRequest(&DataShard, request, status, error)) { + if (!TDirectTxErase::CheckRequest(&DataShard, request, status, error, ctx)) { return buildUnsuccessfulResult(error); } diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 761ea6e643a4..b002d5011dd9 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_integrity_trails.h" #include "datashard_txs.h" #include "datashard_locks_db.h" #include "memory_state_migration.h" @@ -4664,15 +4665,17 @@ class TBreakWriteConflictsTxObserverVolatileDependenciesGuard { TBreakWriteConflictsTxObserver* const Observer; }; -bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, - TArrayRef keyCells, absl::flat_hash_set& volatileDependencies) +bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, ui64 globalTxId, + TArrayRef keyCells, absl::flat_hash_set& volatileDependencies, const TActorContext& ctx) { const auto localTid = GetLocalTableId(tableId); Y_ABORT_UNLESS(localTid); if (auto* cached = GetConflictsCache().GetTableCache(localTid).FindUncommittedWrites(keyCells)) { for (ui64 txId : *cached) { - BreakWriteConflict(txId, volatileDependencies); + if (BreakWriteConflict(txId, volatileDependencies)) { + NDataIntegrity::LogIntegrityTrailsLocks(ctx, TabletID(), globalTxId, {txId}); + } } return true; } @@ -4699,14 +4702,17 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl return true; } -void TDataShard::BreakWriteConflict(ui64 txId, absl::flat_hash_set& volatileDependencies) { +bool TDataShard::BreakWriteConflict(ui64 txId, absl::flat_hash_set& volatileDependencies) { if (auto* info = GetVolatileTxManager().FindByCommitTxId(txId)) { if (info->State != EVolatileTxState::Aborting) { volatileDependencies.insert(txId); } - } else { - SysLocksTable().BreakLock(txId); + + return false; } + + SysLocksTable().BreakLock(txId); + return true; } class TDataShard::TTxGetOpenTxs : public NTabletFlatExecutor::TTransactionBase { diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 84933653af3e..ae1b41491632 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -1,5 +1,6 @@ #include "change_collector.h" #include "datashard_common_upload.h" +#include "datashard_integrity_trails.h" #include "datashard_user_db.h" namespace NKikimr::NDataShard { @@ -15,7 +16,7 @@ TCommonUploadOps::TCommonUploadOps(typename TEvRequest: template bool TCommonUploadOps::Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId, - absl::flat_hash_set* volatileReadDependencies) + absl::flat_hash_set* volatileReadDependencies, const NActors::TActorContext& ctx) { const auto& record = Ev->Get()->Record; Result = MakeHolder(self->TabletID()); @@ -210,13 +211,14 @@ bool TCommonUploadOps::Execute(TDataShard* self, TTrans // produce inconsistency. if (BreakLocks) { if (breakWriteConflicts) { - if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { + if (!self->BreakWriteConflicts(txc.DB, fullTableId, globalTxId, keyCells.GetCells(), volatileDependencies, ctx)) { pageFault = true; } } if (!pageFault) { - self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); + auto locks = self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); + NDataIntegrity::LogIntegrityTrailsLocks(ctx, self->TabletID(), globalTxId, locks); } } diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h index 0359770362af..d3b5f0c9436c 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.h +++ b/ydb/core/tx/datashard/datashard_common_upload.h @@ -23,7 +23,7 @@ class TCommonUploadOps { protected: bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId, - absl::flat_hash_set* volatileReadDependencies); + absl::flat_hash_set* volatileReadDependencies, const NActors::TActorContext& ctx); void GetResult(TDataShard* self, TActorId& target, THolder& event, ui64& cookie); const TEvRequest* GetRequest() const; TEvResponse* GetResult(); diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 143589c8e3bd..a356cba94807 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -1,5 +1,6 @@ #include "change_collector.h" #include "datashard_direct_erase.h" +#include "datashard_integrity_trails.h" #include "datashard_user_db.h" #include "erase_rows_condition.h" @@ -19,7 +20,7 @@ TDirectTxErase::TDirectTxErase(TEvDataShard::TEvEraseRowsRequest::TPtr& ev) TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( TDataShard* self, const TExecuteParams& params, const NKikimrTxDataShard::TEvEraseRowsRequest& request, - NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error) + NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error, const NActors::TActorContext& ctx) { const ui64 tableId = request.GetTableId(); const TTableId fullTableId(self->GetPathOwnerId(), tableId); @@ -140,7 +141,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } if (breakWriteConflicts) { - if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { + if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, params.GlobalTxId, keyCells.GetCells(), volatileDependencies, ctx)) { pageFault = true; } } @@ -165,7 +166,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( continue; } - self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); + auto locks = self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); + NDataIntegrity::LogIntegrityTrailsLocks(ctx, self->TabletID(), params.GlobalTxId, locks); if (!volatileDependencies.empty()) { if (!params.GlobalTxId) { @@ -217,9 +219,9 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } bool TDirectTxErase::CheckRequest(TDataShard* self, const NKikimrTxDataShard::TEvEraseRowsRequest& request, - NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error) + NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error, const NActors::TActorContext& ctx) { - const auto result = CheckedExecute(self, TExecuteParams::ForCheck(), request, status, error); + const auto result = CheckedExecute(self, TExecuteParams::ForCheck(), request, status, error, ctx); switch (result) { case EStatus::Success: return true; @@ -232,7 +234,7 @@ bool TDirectTxErase::CheckRequest(TDataShard* self, const NKikimrTxDataShard::TE bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies) + ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies, const NActors::TActorContext& ctx) { const auto& record = Ev->Get()->Record; @@ -244,7 +246,7 @@ bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc, NKikimrTxDataShard::TEvEraseRowsResponse::EStatus status; TString error; - const auto result = CheckedExecute(self, params, record, status, error); + const auto result = CheckedExecute(self, params, record, status, error, ctx); switch (result) { case EStatus::Success: case EStatus::Error: diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h index 583834287fff..4ac63ddc665d 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.h +++ b/ydb/core/tx/datashard/datashard_direct_erase.h @@ -65,17 +65,17 @@ class TDirectTxErase : public IDirectTx { static EStatus CheckedExecute( TDataShard* self, const TExecuteParams& params, const NKikimrTxDataShard::TEvEraseRowsRequest& request, - NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error); + NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error, const NActors::TActorContext& ctx); public: explicit TDirectTxErase(TEvDataShard::TEvEraseRowsRequest::TPtr& ev); static bool CheckRequest(TDataShard* self, const NKikimrTxDataShard::TEvEraseRowsRequest& request, - NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error); + NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error, const NActors::TActorContext& ctx); bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies) override; + ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies, const NActors::TActorContext& ctx) override; TDirectTxResult GetResult(TDataShard* self) override; TVector GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp index d4c5f6d2310e..7df6448a2a4b 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp @@ -30,12 +30,12 @@ void TDirectTransaction::BuildExecutionPlan(bool loaded) RewriteExecutionPlan(plan); } -bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) { +bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) { auto [readVersion, writeVersion] = self->GetReadWriteVersions(this); // NOTE: may throw TNeedGlobalTxId exception, which is handled in direct tx unit absl::flat_hash_set volatileReadDependencies; - if (!Impl->Execute(self, txc, readVersion, writeVersion, GetGlobalTxId(), volatileReadDependencies)) { + if (!Impl->Execute(self, txc, readVersion, writeVersion, GetGlobalTxId(), volatileReadDependencies, ctx)) { if (!volatileReadDependencies.empty()) { for (ui64 txId : volatileReadDependencies) { AddVolatileDependency(txId); diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h index 6bdb518e351b..9bb2c3402021 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.h +++ b/ydb/core/tx/datashard/datashard_direct_transaction.h @@ -22,7 +22,7 @@ class IDirectTx { virtual ~IDirectTx() = default; virtual bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies) = 0; + ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies, const NActors::TActorContext&) = 0; virtual TDirectTxResult GetResult(TDataShard* self) = 0; virtual TVector GetCollectedChanges() const = 0; }; @@ -35,7 +35,7 @@ class TDirectTransaction : public TOperation { void BuildExecutionPlan(bool) override; private: - bool Execute(TDataShard* self, TTransactionContext& txc); + bool Execute(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); void SendResult(TDataShard* self, const TActorContext& ctx); TVector GetCollectedChanges() const; diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp index d0da2be607c2..2cd54d538824 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.cpp +++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp @@ -10,10 +10,10 @@ TDirectTxUpload::TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev) bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies) + ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies, const NActors::TActorContext& ctx) { return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion, - globalTxId, &volatileReadDependencies); + globalTxId, &volatileReadDependencies, ctx); } TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) { diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h index 5452728cd421..9b19e0187b02 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.h +++ b/ydb/core/tx/datashard/datashard_direct_upload.h @@ -16,7 +16,7 @@ class TDirectTxUpload : public IDirectTx bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, - ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies) override; + ui64 globalTxId, absl::flat_hash_set& volatileReadDependencies, const NActors::TActorContext& ctx) override; TDirectTxResult GetResult(TDataShard* self) override; TVector GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 3bb70f6291a0..d111a55b029c 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2126,8 +2126,8 @@ class TDataShard * * Returns true on success and false on page fault. */ - bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, - TArrayRef keyCells, absl::flat_hash_set& volatileDependencies); + bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, ui64 globalTxId, + TArrayRef keyCells, absl::flat_hash_set& volatileDependencies, const TActorContext& ctx); /** * Handles a specific write conflict txId @@ -2137,7 +2137,7 @@ class TDataShard * * Either adds txId to volatile dependencies or breaks a known write lock. */ - void BreakWriteConflict(ui64 txId, absl::flat_hash_set& volatileDependencies); + bool BreakWriteConflict(ui64 txId, absl::flat_hash_set& volatileDependencies); enum ELogThrottlerType { CheckDataTxUnit_Execute = 0, diff --git a/ydb/core/tx/datashard/datashard_integrity_trails.h b/ydb/core/tx/datashard/datashard_integrity_trails.h index de0a65569af4..9c742cf615f1 100644 --- a/ydb/core/tx/datashard/datashard_integrity_trails.h +++ b/ydb/core/tx/datashard/datashard_integrity_trails.h @@ -126,6 +126,32 @@ inline void LogIntegrityTrailsKeys(const NActors::TActorContext& ctx, const ui64 } } +inline void LogIntegrityTrailsLocks(const NActors::TActorContext& ctx, const ui64 tabletId, const ui64 txId, const TVector& locks) { + if (locks.empty()) { + return; + } + + auto logFn = [&]() { + TStringStream ss; + + LogKeyValue("Component", "DataShard", ss); + LogKeyValue("Type", "Locks", ss); + LogKeyValue("TabletId", ToString(tabletId), ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + ss << "BreakLocks: ["; + for (const auto& lock : locks) { + ss << lock << " "; + } + ss << "]"; + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, logFn()); + +} + template inline void LogIntegrityTrailsFinish(const NActors::TActorContext& ctx, const ui64 tabletId, const ui64 txId, const typename TxResult::EStatus status) { auto logFn = [&]() { diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 2d1941cb8bc0..f0f3c4a86728 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -1,5 +1,6 @@ #include "datashard_kqp.h" #include "datashard_impl.h" +#include "datashard_integrity_trails.h" #include "datashard_user_db.h" #include @@ -843,12 +844,14 @@ void KqpCommitLocks(ui64 origin, const NKikimrDataEvents::TKqpLocks* kqpLocks, T LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLock " << lockProto.ShortDebugString()); auto lockKey = MakeLockKey(lockProto); - sysLocks.CommitLock(lockKey); + auto breakLocks = sysLocks.CommitLock(lockKey); TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId()); auto txId = lockProto.GetLockId(); userDb.CommitChanges(tableId, txId, writeVersion); + + NDataIntegrity::LogIntegrityTrailsLocks(TlsActivationContext->AsActorContext(), origin, txId, breakLocks); } } else { KqpEraseLocks(origin, kqpLocks, sysLocks); diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index c1a19939f1ee..58f726434fad 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -201,7 +201,7 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBaseSysLocksTable().BreakLocks(tableId, keyCellVec.GetCells()); + Self->SysLocksTable().BreakLocks(tableId, keyCellVec.GetCells()); // TODO: log lock txc.DB.Update(userTable.LocalTid, rop, key, update, *MvccReadWriteVersion); Self->GetConflictsCache().GetTableCache(userTable.LocalTid).RemoveUncommittedWrites(keyCellVec.GetCells(), txc.DB); } diff --git a/ydb/core/tx/datashard/datashard_s3_upload_rows.cpp b/ydb/core/tx/datashard/datashard_s3_upload_rows.cpp index 0181d2842246..e7e8d6143004 100644 --- a/ydb/core/tx/datashard/datashard_s3_upload_rows.cpp +++ b/ydb/core/tx/datashard/datashard_s3_upload_rows.cpp @@ -8,12 +8,12 @@ TDataShard::TTxS3UploadRows::TTxS3UploadRows(TDataShard* ds, TEvDataShard::TEvS3 { } -bool TDataShard::TTxS3UploadRows::Execute(TTransactionContext& txc, const TActorContext&) { +bool TDataShard::TTxS3UploadRows::Execute(TTransactionContext& txc, const TActorContext& ctx) { auto [readVersion, writeVersion] = Self->GetReadWriteVersions(); // NOTE: will not throw TNeedGlobalTxId since we set breakLocks to false if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion, - /* globalTxId */ 0, /* volatile read dependencies */ nullptr)) + /* globalTxId */ 0, /* volatile read dependencies */ nullptr, ctx)) { return false; } diff --git a/ydb/core/tx/datashard/datashard_user_db.cpp b/ydb/core/tx/datashard/datashard_user_db.cpp index c178ba6364b5..329bbe4e2591 100644 --- a/ydb/core/tx/datashard/datashard_user_db.cpp +++ b/ydb/core/tx/datashard/datashard_user_db.cpp @@ -1,6 +1,7 @@ #include "datashard_user_db.h" #include "datashard_impl.h" +#include "datashard_integrity_trails.h" namespace NKikimr::NDataShard { @@ -203,7 +204,8 @@ void TDataShardUserDb::UpsertRowInt( if (LockTxId) { Self.SysLocksTable().SetWriteLock(tableId, keyCells); } else { - Self.SysLocksTable().BreakLocks(tableId, keyCells); + auto locks = Self.SysLocksTable().BreakLocks(tableId, keyCells); + NDataIntegrity::LogIntegrityTrailsLocks(TlsActivationContext->AsActorContext(), Self.TabletID(), GlobalTxId, locks); } Self.SetTableUpdateTime(tableId, Now); @@ -658,6 +660,7 @@ void TDataShardUserDb::BreakWriteConflict(ui64 txId) { } else { // Break uncommitted locks Self.SysLocksTable().BreakLock(txId); + NDataIntegrity::LogIntegrityTrailsLocks(TlsActivationContext->AsActorContext(), Self.TabletID(), GlobalTxId, {txId}); } } diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index b47187c5daa8..5fdccf51a60a 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -41,7 +41,7 @@ class TDirectOpUnit : public TExecutionUnit { Y_ABORT_UNLESS(tx != nullptr); try { - if (!tx->Execute(&DataShard, txc)) { + if (!tx->Execute(&DataShard, txc, ctx)) { return EExecutionStatus::Restart; } } catch (const TNeedGlobalTxId&) { diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index 226773c9dbfa..632ce7e421f6 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -2,6 +2,7 @@ #include "datashard_active_transaction.h" #include "datashard_distributed_erase.h" #include "datashard_impl.h" +#include "datashard_integrity_trails.h" #include "datashard_pipeline.h" #include "datashard_user_db.h" #include "execution_unit_ctors.h" @@ -51,7 +52,7 @@ class TExecuteDistributedEraseTxUnit : public TExecutionUnit { THolder changeCollector{CreateChangeCollector(DataShard, userDb, groupProvider, txc.DB, request.GetTableId())}; auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize()); - if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, op->GetGlobalTxId(), + if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, op->GetGlobalTxId(), ctx, &userDb, &groupProvider, changeCollector.Get())) { return EExecutionStatus::Restart; @@ -94,7 +95,7 @@ class TExecuteDistributedEraseTxUnit : public TExecutionUnit { Y_ABORT_UNLESS(presentRows.contains(rs.Origin)); auto confirmedRows = DeserializeBitMap(body.GetConfirmedRows()); - if (!Execute(txc, request, presentRows.at(rs.Origin), confirmedRows, writeVersion, op->GetGlobalTxId())) { + if (!Execute(txc, request, presentRows.at(rs.Origin), confirmedRows, writeVersion, op->GetGlobalTxId(), ctx)) { return EExecutionStatus::Restart; } } @@ -120,7 +121,7 @@ class TExecuteDistributedEraseTxUnit : public TExecutionUnit { bool Execute(TTransactionContext& txc, const NKikimrTxDataShard::TEvEraseRowsRequest& request, const TDynBitMap& presentRows, const TDynBitMap& confirmedRows, const TRowVersion& writeVersion, - ui64 globalTxId, + ui64 globalTxId, const TActorContext& ctx, TDataShardUserDb* userDb = nullptr, TDataShardChangeGroupProvider* groupProvider = nullptr, IDataShardChangeCollector* changeCollector = nullptr) @@ -159,7 +160,7 @@ class TExecuteDistributedEraseTxUnit : public TExecutionUnit { } if (breakWriteConflicts || checkVolatileDependencies) { - if (!DataShard.BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { + if (!DataShard.BreakWriteConflicts(txc.DB, fullTableId, globalTxId, keyCells.GetCells(), volatileDependencies, ctx)) { if (breakWriteConflicts) { pageFault = true; } else if (checkVolatileDependencies) { @@ -186,7 +187,8 @@ class TExecuteDistributedEraseTxUnit : public TExecutionUnit { continue; } - DataShard.SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); + auto locks = DataShard.SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); + NDataIntegrity::LogIntegrityTrailsLocks(ctx, DataShard.TabletID(), globalTxId, locks); if (!volatileDependencies.empty() || volatileOrdered) { txc.DB.UpdateTx(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId); diff --git a/ydb/core/tx/locks/locks.cpp b/ydb/core/tx/locks/locks.cpp index 8b2b6ef03b5d..0230aba9e8ae 100644 --- a/ydb/core/tx/locks/locks.cpp +++ b/ydb/core/tx/locks/locks.cpp @@ -1185,16 +1185,20 @@ void TSysLocks::EraseLock(const TArrayRef& key) { } } -void TSysLocks::CommitLock(const TArrayRef& key) { +TVector TSysLocks::CommitLock(const TArrayRef& key) { Y_ABORT_UNLESS(Update); + TVector locks; if (auto* lock = Locker.FindLockPtr(GetLockId(key))) { for (auto& pr : lock->ConflictLocks) { if (!!(pr.second & ELockConflictFlags::BreakThemOnOurCommit)) { Update->AddBreakLock(pr.first); + locks.push_back(pr.first->LockId); } } Update->AddEraseLock(lock); } + + return locks; } void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef& key) { @@ -1238,14 +1242,16 @@ void TSysLocks::BreakLock(ui64 lockId) { } } -void TSysLocks::BreakLocks(const TTableId& tableId, const TArrayRef& key) { +TVector TSysLocks::BreakLocks(const TTableId& tableId, const TArrayRef& key) { Y_ABORT_UNLESS(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks))); + TVector breakLockIds; if (auto* table = Locker.FindTablePtr(tableId)) { if (table->HasRangeLocks()) { // Note: avoid copying the key, find all locks here - table->Ranges.EachIntersection(key, [update = Update](const TRangeTreeBase::TRange&, TLockInfo* lock) { + table->Ranges.EachIntersection(key, [update = Update, &breakLockIds](const TRangeTreeBase::TRange&, TLockInfo* lock) { update->AddBreakLock(lock); + breakLockIds.push_back(lock->LockId); }); } if (table->HasShardLocks()) { @@ -1253,6 +1259,8 @@ void TSysLocks::BreakLocks(const TTableId& tableId, const TArrayRef Update->AddBreakShardLocks(table); } } + + return breakLockIds; } void TSysLocks::AddReadConflict(ui64 conflictId) { diff --git a/ydb/core/tx/locks/locks.h b/ydb/core/tx/locks/locks.h index 76c486123aa0..c3e6371dd38c 100644 --- a/ydb/core/tx/locks/locks.h +++ b/ydb/core/tx/locks/locks.h @@ -883,12 +883,12 @@ class TSysLocks { TLock GetLock(const TArrayRef& syslockKey) const; void EraseLock(ui64 lockId); void EraseLock(const TArrayRef& syslockKey); - void CommitLock(const TArrayRef& syslockKey); + TVector CommitLock(const TArrayRef& syslockKey); void SetLock(const TTableId& tableId, const TArrayRef& key); void SetLock(const TTableId& tableId, const TTableRange& range); void SetWriteLock(const TTableId& tableId, const TArrayRef& key); void BreakLock(ui64 lockId); - void BreakLocks(const TTableId& tableId, const TArrayRef& key); + TVector BreakLocks(const TTableId& tableId, const TArrayRef& key); void AddReadConflict(ui64 conflictId); void AddWriteConflict(ui64 conflictId); void AddWriteConflict(const TTableId& tableId, const TArrayRef& key);