Skip to content

Commit

Permalink
Merge c0deeaf into 1643e7a
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 6, 2024
2 parents 1643e7a + c0deeaf commit b66ff67
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 52 deletions.
13 changes: 8 additions & 5 deletions ydb/core/grpc_services/query/rpc_kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class TBeginTransactionRPC : public TActorBootstrapped<TBeginTransactionRPC> {
if (kqpResponse.HasTxMeta()) {
beginTxResult->mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
}
*beginTxResult->mutable_issues() = issueMessage;
}

Reply(record.GetYdbStatus(), beginTxResult);
Expand Down Expand Up @@ -168,7 +169,7 @@ class TFinishTransactionRPC : public TActorBootstrapped<TFinishTransactionRPC>
private:
virtual std::pair<TString, TString> GetReqData() const = 0;
virtual void Fill(NKikimrKqp::TQueryRequest* req) const = 0;
virtual NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const = 0;
virtual NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const = 0;

void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
Expand Down Expand Up @@ -218,15 +219,15 @@ class TFinishTransactionRPC : public TActorBootstrapped<TFinishTransactionRPC>
const auto& record = ev->Get()->Record.GetRef();
FillCommonKqpRespFields(record, Request.get());

NYql::TIssues issues;
if (record.HasResponse()) {
const auto& kqpResponse = record.GetResponse();
const auto& issueMessage = kqpResponse.GetQueryIssues();
NYql::TIssues issues;
NYql::IssuesFromMessage(issueMessage, issues);
Request->RaiseIssues(issues);
}

Reply(record.GetYdbStatus(), CreateResult(record.GetYdbStatus()));
Reply(record.GetYdbStatus(), CreateResult(record.GetYdbStatus(), issues));
}

void InternalError(const TString& message) {
Expand Down Expand Up @@ -271,9 +272,10 @@ class TCommitTransactionRPC : public TFinishTransactionRPC {
req->MutableTxControl()->set_commit_tx(true);
}

NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const override {
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const override {
auto result = TEvCommitTransactionRequest::AllocateResult<Ydb::Query::CommitTransactionResponse>(Request);
result->set_status(status);
NYql::IssuesToMessage(issues, result->mutable_issues());
return result;
}
};
Expand All @@ -293,9 +295,10 @@ class TRollbackTransactionRPC : public TFinishTransactionRPC {
req->SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX);
}

NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const override {
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const override {
auto result = TEvRollbackTransactionRequest::AllocateResult<Ydb::Query::RollbackTransactionResponse>(Request);
result->set_status(status);
NYql::IssuesToMessage(issues, result->mutable_issues());
return result;
}
};
Expand Down
19 changes: 19 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,25 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
}
AFL_ENSURE(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender);

AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
("Recv TEvScanData from ShardID=", ev->Sender)
("ScanId", ev->Get()->ScanId)
("Finished", ev->Get()->Finished)
("Lock", [&]() {
TStringBuilder builder;
for (const auto& lock : ev->Get()->LocksInfo.Locks) {
builder << lock.ShortDebugString();
}
return builder;
}())
("BrokenLocks", [&]() {
TStringBuilder builder;
for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) {
builder << lock.ShortDebugString();
}
return builder;
}());

TInstant startTime = TActivationContext::Now();
if (ev->Get()->Finished) {
state->State = EShardState::PostRunning;
Expand Down
57 changes: 42 additions & 15 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);

LOG_D("Recv EvWriteResult from ShardID=" << shardId
LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
<< ", LocksCount= " << ev->Get()->Record.GetTxLocks().size()
<< ", Locks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}()
<< ", Cookie=" << ev->Cookie
<< ", error=" << issues.ToString());

Expand All @@ -486,6 +492,18 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED: {
YQL_ENSURE(false);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
LOG_D("Broken locks: " << res->Record.DebugString());
YQL_ENSURE(shardState->State == TShardState::EState::Preparing);
Counters->TxProxyMon->TxResultAborted->Inc();
LocksBroken = true;

YQL_ENSURE(!res->Record.GetTxLocks().empty());
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
res->Record.GetTxLocks(0).GetSchemeShard(),
res->Record.GetTxLocks(0).GetPathId());
ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}
default:
{
return ShardError(res->Record);
Expand Down Expand Up @@ -863,6 +881,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, issues);
}
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
issues.AddIssue(NYql::YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated."));
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
}
}
Expand Down Expand Up @@ -923,6 +942,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void ExecutePlanned() {
YQL_ENSURE(!LocksBroken);
YQL_ENSURE(TxCoordinator);
auto ev = MakeHolder<TEvTxProxy::TEvProposeTransaction>();
ev->Record.SetCoordinatorID(TxCoordinator);
Expand Down Expand Up @@ -1133,10 +1153,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);

LOG_D("Recv EvWriteResult from ShardID=" << shardId
LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
<< ", LocksCount= " << ev->Get()->Record.GetTxLocks().size()
<< ", Locks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}()
<< ", Cookie=" << ev->Cookie
<< ", error=" << issues.ToString());

Expand Down Expand Up @@ -1167,16 +1193,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
shardState->State = TShardState::EState::Finished;
Counters->TxProxyMon->TxResultAborted->Inc();
LocksBroken = true;

if (!res->Record.GetTxLocks().empty()) {
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
res->Record.GetTxLocks(0).GetSchemeShard(),
res->Record.GetTxLocks(0).GetPathId());
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}

CheckExecutionComplete();
return;
YQL_ENSURE(!res->Record.GetTxLocks().empty());
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
res->Record.GetTxLocks(0).GetSchemeShard(),
res->Record.GetTxLocks(0).GetPathId());
ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}
default:
{
Expand Down Expand Up @@ -1722,7 +1743,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
<< ", LocksOp=" << NKikimrDataEvents::TKqpLocks::ELocksOp_Name(evWriteTransaction->Record.GetLocks().GetOp())
<< ", SendingShards=" << shardsToString(evWriteTransaction->Record.GetLocks().GetSendingShards())
<< ", ReceivingShards=" << shardsToString(evWriteTransaction->Record.GetLocks().GetReceivingShards())
<< ", LocksCount= " << evWriteTransaction->Record.GetLocks().LocksSize());
<< ", Locks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : evWriteTransaction->Record.GetLocks().GetLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}());

LOG_D("ExecuteEvWriteTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));

Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,26 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
return;
}

CA_LOG_D("Recv TEvReadResult from ShardID=" << Reads[id].Shard->TabletId
<< ", ReadId=" << id
<< ", Status=" << Ydb::StatusIds::StatusCode_Name(record.GetStatus().GetCode())
<< ", Finished=" << record.GetFinished()
<< ", RowCount=" << record.GetRowCount()
<< ", TxLocks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : record.GetTxLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}()
<< ", BrokenTxLocks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : record.GetBrokenTxLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}());

if (!record.HasNodeId()) {
Counters->ReadActorAbsentNodeId->Inc();
} else if (record.GetNodeId() != SelfId().NodeId()) {
Expand Down
23 changes: 21 additions & 2 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,6 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
const auto& record = ev->Get()->Record;

CA_LOG_D("TEvReadResult was received for table: " << StreamLookupWorker->GetTablePath() <<
", readId: " << record.GetReadId() << ", finished: " << record.GetFinished());

auto readIt = Reads.find(record.GetReadId());
if (readIt == Reads.end() || readIt->second.State != EReadState::Running) {
Expand All @@ -288,6 +286,27 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

auto& read = readIt->second;

CA_LOG_D("Recv TEvReadResult (stream lookup) from ShardID=" << read.ShardId
<< ", Table = " << StreamLookupWorker->GetTablePath()
<< ", ReadId=" << record.GetReadId()
<< ", Status=" << Ydb::StatusIds::StatusCode_Name(record.GetStatus().GetCode())
<< ", Finished=" << record.GetFinished()
<< ", RowCount=" << record.GetRowCount()
<< ", TxLocks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : record.GetTxLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}()
<< ", BrokenTxLocks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : record.GetBrokenTxLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}());

for (auto& lock : record.GetBrokenTxLocks()) {
BrokenLocks.push_back(lock);
}
Expand Down
24 changes: 21 additions & 3 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
CA_LOG_D("Recv EvWriteResult from ShardID=" << ev->Get()->Record.GetOrigin()
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
<< ", LocksCount= " << ev->Get()->Record.GetTxLocks().size()
<< ", Locks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}()
<< ", Cookie=" << ev->Cookie);

switch (ev->Get()->GetStatus()) {
Expand Down Expand Up @@ -526,7 +532,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
CA_LOG_D("Got completed result TxId=" << ev->Get()->Record.GetTxId()
<< ", TabletId=" << ev->Get()->Record.GetOrigin()
<< ", Cookie=" << ev->Cookie
<< ", LocksCount=" << ev->Get()->Record.GetTxLocks().size());
<< ", Locks=" << [&]() {
TStringBuilder builder;
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}());

OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie);

Expand Down Expand Up @@ -625,7 +637,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", TxId=" << evWrite->Record.GetTxId()
<< ", TxMode=" << evWrite->Record.GetTxMode()
<< ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId()
<< ", LocksCount= " << evWrite->Record.GetLocks().LocksSize()
<< ", Locks= " << [&]() {
TStringBuilder builder;
for (const auto& lock : evWrite->Record.GetLocks().GetLocks()) {
builder << lock.ShortDebugString();
}
return builder;
}()
<< ", Size=" << serializationResult.TotalDataSize << ", Cookie=" << metadata->Cookie
<< ", OperationsCount=" << metadata->OperationsCount << ", IsFinal=" << metadata->IsFinal
<< ", Attempts=" << metadata->SendAttempts);
Expand Down
46 changes: 25 additions & 21 deletions ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) {
)"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
result.GetIssues().PrintTo(Cerr);
// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
// [] (const NYql::TIssue& issue) {
// return issue.GetMessage().Contains("/Root/Test");
// }));
if (!GetIsOlap()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
[] (const NYql::TIssue& issue) {
return issue.GetMessage().Contains("/Root/Test");
}), result.GetIssues().ToString());
}

result = session2.ExecuteQuery(Q_(R"(
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
Expand Down Expand Up @@ -96,13 +98,13 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) {
auto commitResult = tx1->Commit().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
commitResult.GetIssues().PrintTo(Cerr);
// TODO:
//UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
// [] (const NYql::TIssue& issue) {
// Y_UNUSED(issue);
// return issue.GetMessage().Contains("/Root/Test");
// return true;
// }), commitResult.GetIssues().ToString());
UNIT_ASSERT_C(commitResult.GetIssues().Size() != 0, commitResult.GetIssues().ToString());
if (!GetIsOlap()) {
UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
[] (const NYql::TIssue& issue) {
return issue.GetMessage().Contains("/Root/Test");
}), commitResult.GetIssues().ToString());
}

result = session2.ExecuteQuery(Q_(R"(
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
Expand Down Expand Up @@ -196,10 +198,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) {
)"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
result.GetIssues().PrintTo(Cerr);
//UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
// [] (const NYql::TIssue& issue) {
// return issue.GetMessage().Contains("/Root/Test");
// }));
if (!GetIsOlap()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
[] (const NYql::TIssue& issue) {
return issue.GetMessage().Contains("/Root/Test");
}), result.GetIssues().ToString());
}

result = session1.ExecuteQuery(Q1_(R"(
SELECT * FROM Test WHERE Group = 11;
Expand Down Expand Up @@ -255,12 +259,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) {
)"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
result.GetIssues().PrintTo(Cerr);
// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED));

// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
// [] (const NYql::TIssue& issue) {
// return issue.GetMessage().Contains("/Root/Test");
// }));
if (!GetIsOlap()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
[] (const NYql::TIssue& issue) {
return issue.GetMessage().Contains("/Root/Test");
}), result.GetIssues().ToString());
}

result = session1.ExecuteQuery(Q1_(R"(
SELECT * FROM Test WHERE Group = 11;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ Y_UNIT_TEST_SUITE(KqpSinkMvcc) {
)"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
// UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString());
if (!GetIsOlap()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString());
}
}
};

Expand Down
Loading

0 comments on commit b66ff67

Please sign in to comment.