Skip to content

Commit

Permalink
feat(data_integrity_trails): add logging of acquired locks
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Feb 20, 2025
1 parent d222a17 commit d366dba
Show file tree
Hide file tree
Showing 24 changed files with 206 additions and 98 deletions.
97 changes: 93 additions & 4 deletions ydb/core/kqp/common/kqp_data_integrity_trails.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <library/cpp/string_utils/base64/base64.h>

#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/datashard/datashard.h>

namespace NKikimr {
namespace NDataIntegrity {
Expand Down Expand Up @@ -97,23 +99,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction
}

// DataExecuter
inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) {
inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) {
TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Request", ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(txId), ss);
LogKeyValue("Locks", "[" + txLocksDebugStr + "]", ss);

if (shardId) {
LogKeyValue("ShardId", ToString(*shardId), ss);
}

LogKeyValue("Type", type, ss, /*last*/ true);
LogKeyValue("TxType", type, ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId));
}

inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) {
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
const auto& record = ev->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Response", ss);
LogKeyValue("State", state, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : record.GetTxLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);
LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss);

NYql::TIssues issues;
NYql::IssuesFromMessage(record.GetIssues(), issues);
LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
}

inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) {
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
const auto& record = ev->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Response", ss);
LogKeyValue("State", state, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : record.GetTxLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);
LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss);
LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
}

template <typename TActorResultInfo>
inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) {
TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", type, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(txId), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : info.GetLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId));
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info));
}

// WriteActor,BufferActor
Expand Down
14 changes: 11 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
for (auto& lock : info.GetLocks()) {
if (!TxManager) {
Locks.push_back(lock);
Expand All @@ -246,6 +247,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
NKikimrKqp::TEvKqpOutputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
for (auto& lock : info.GetLocks()) {
if (!TxManager) {
Locks.push_back(lock);
Expand Down Expand Up @@ -523,6 +525,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId);

NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Got propose result, shard: " << shardId << ", status: "
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
<< ", error: " << res->GetError());
Expand Down Expand Up @@ -592,6 +595,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);

NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
Expand Down Expand Up @@ -1122,7 +1126,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
}

NDataIntegrity::LogIntegrityTrails("PlannedTx", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());

LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards");
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true));
Expand Down Expand Up @@ -1263,6 +1267,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);

NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
Expand Down Expand Up @@ -1334,6 +1339,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState);

NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Got propose result, shard: " << shardId << ", status: "
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
<< ", error: " << res->GetError());
Expand Down Expand Up @@ -1824,7 +1830,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
flags));
}

NDataIntegrity::LogIntegrityTrails("DatashardTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());

ResponseEv->Orbit.Fork(evData->Orbit);
ev = std::move(evData);
Expand Down Expand Up @@ -1860,7 +1867,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

auto traceId = ExecuterSpan.GetTraceId();

NDataIntegrity::LogIntegrityTrails("EvWriteTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());

auto shardsToString = [](const auto& shards) {
TStringBuilder builder;
Expand Down
45 changes: 2 additions & 43 deletions ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}

// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 1 : 0);
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0);
// check grpc logs
Expand Down Expand Up @@ -204,55 +204,14 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}

// check executer logs (should be empty, because executer only logs modification operations)
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0);
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
// check datashard logs (should be empty, because DataShard only logs modification operations)
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
}

Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) {
TStringStream ss;
{
TKikimrSettings serverSettings;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());


const auto query = R"(
--!syntax_v1
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)";

if (Streaming) {
auto result = client.StreamExecuteYqlScript(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CollectStreamResult(result);
} else {
auto result = client.ExecuteYqlScript(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}

// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
// check session actor logs (should contain double logs because this query was executed via worker actor)
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4);
// check grpc logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
// check datashard logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);

Cout << ss.Str() << Endl;
}
}

} // namespace NKqp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TCheckDistributedEraseTxUnit : public TExecutionUnit {

NKikimrTxDataShard::TEvEraseRowsResponse::EStatus status;
TString error;
if (!TDirectTxErase::CheckRequest(&DataShard, request, status, error)) {
if (!TDirectTxErase::CheckRequest(&DataShard, request, status, error, ctx)) {
return buildUnsuccessfulResult(error);
}

Expand Down
18 changes: 12 additions & 6 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_integrity_trails.h"
#include "datashard_txs.h"
#include "datashard_locks_db.h"
#include "memory_state_migration.h"
Expand Down Expand Up @@ -4664,15 +4665,17 @@ class TBreakWriteConflictsTxObserverVolatileDependenciesGuard {
TBreakWriteConflictsTxObserver* const Observer;
};

bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId,
TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies)
bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, ui64 globalTxId,
TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies, const TActorContext& ctx)
{
const auto localTid = GetLocalTableId(tableId);
Y_ABORT_UNLESS(localTid);

if (auto* cached = GetConflictsCache().GetTableCache(localTid).FindUncommittedWrites(keyCells)) {
for (ui64 txId : *cached) {
BreakWriteConflict(txId, volatileDependencies);
if (BreakWriteConflict(txId, volatileDependencies)) {
NDataIntegrity::LogIntegrityTrailsLocks(ctx, TabletID(), globalTxId, {txId});
}
}
return true;
}
Expand All @@ -4699,14 +4702,17 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl
return true;
}

void TDataShard::BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies) {
bool TDataShard::BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies) {
if (auto* info = GetVolatileTxManager().FindByCommitTxId(txId)) {
if (info->State != EVolatileTxState::Aborting) {
volatileDependencies.insert(txId);
}
} else {
SysLocksTable().BreakLock(txId);

return false;
}

SysLocksTable().BreakLock(txId);
return true;
}

class TDataShard::TTxGetOpenTxs : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard__read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2931,7 +2931,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase

bool isBroken = state.Lock->IsBroken();
if (!isBroken && (Reader->HadInvisibleRowSkips() || Reader->HadInconsistentResult())) {
sysLocks.BreakLock(state.Lock->GetLockId());
sysLocks.BreakLock(state.Lock->GetLockId()); // TODO: log lock
sysLocks.ApplyLocks();
Y_ABORT_UNLESS(state.Lock->IsBroken());
isBroken = true;
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/datashard/datashard_common_upload.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "change_collector.h"
#include "datashard_common_upload.h"
#include "datashard_integrity_trails.h"
#include "datashard_user_db.h"

namespace NKikimr::NDataShard {
Expand All @@ -15,7 +16,7 @@ TCommonUploadOps<TEvRequest, TEvResponse>::TCommonUploadOps(typename TEvRequest:
template <typename TEvRequest, typename TEvResponse>
bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTransactionContext& txc,
const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId,
absl::flat_hash_set<ui64>* volatileReadDependencies)
absl::flat_hash_set<ui64>* volatileReadDependencies, const NActors::TActorContext& ctx)
{
const auto& record = Ev->Get()->Record;
Result = MakeHolder<TEvResponse>(self->TabletID());
Expand Down Expand Up @@ -210,13 +211,14 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
// produce inconsistency.
if (BreakLocks) {
if (breakWriteConflicts) {
if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
if (!self->BreakWriteConflicts(txc.DB, fullTableId, globalTxId, keyCells.GetCells(), volatileDependencies, ctx)) {
pageFault = true;
}
}

if (!pageFault) {
self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
auto locks = self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
NDataIntegrity::LogIntegrityTrailsLocks(ctx, self->TabletID(), globalTxId, locks);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_common_upload.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TCommonUploadOps {
protected:
bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion,
const TRowVersion& writeVersion, ui64 globalTxId,
absl::flat_hash_set<ui64>* volatileReadDependencies);
absl::flat_hash_set<ui64>* volatileReadDependencies, const NActors::TActorContext& ctx);
void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie);
const TEvRequest* GetRequest() const;
TEvResponse* GetResult();
Expand Down
Loading

0 comments on commit d366dba

Please sign in to comment.