diff --git a/ydb/core/grpc_services/query/rpc_kqp_tx.cpp b/ydb/core/grpc_services/query/rpc_kqp_tx.cpp index 4f92c00faff7..d71caa0f428e 100644 --- a/ydb/core/grpc_services/query/rpc_kqp_tx.cpp +++ b/ydb/core/grpc_services/query/rpc_kqp_tx.cpp @@ -124,6 +124,7 @@ class TBeginTransactionRPC : public TActorBootstrapped { if (kqpResponse.HasTxMeta()) { beginTxResult->mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id()); } + *beginTxResult->mutable_issues() = issueMessage; } Reply(record.GetYdbStatus(), beginTxResult); @@ -168,7 +169,7 @@ class TFinishTransactionRPC : public TActorBootstrapped private: virtual std::pair GetReqData() const = 0; virtual void Fill(NKikimrKqp::TQueryRequest* req) const = 0; - virtual NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const = 0; + virtual NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const = 0; void StateWork(TAutoPtr& ev) { try { @@ -218,15 +219,15 @@ class TFinishTransactionRPC : public TActorBootstrapped const auto& record = ev->Get()->Record.GetRef(); FillCommonKqpRespFields(record, Request.get()); + NYql::TIssues issues; if (record.HasResponse()) { const auto& kqpResponse = record.GetResponse(); const auto& issueMessage = kqpResponse.GetQueryIssues(); - NYql::TIssues issues; NYql::IssuesFromMessage(issueMessage, issues); Request->RaiseIssues(issues); } - Reply(record.GetYdbStatus(), CreateResult(record.GetYdbStatus())); + Reply(record.GetYdbStatus(), CreateResult(record.GetYdbStatus(), issues)); } void InternalError(const TString& message) { @@ -271,9 +272,10 @@ class TCommitTransactionRPC : public TFinishTransactionRPC { req->MutableTxControl()->set_commit_tx(true); } - NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const override { + NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const override { auto result = TEvCommitTransactionRequest::AllocateResult(Request); result->set_status(status); + NYql::IssuesToMessage(issues, result->mutable_issues()); return result; } }; @@ -293,9 +295,10 @@ class TRollbackTransactionRPC : public TFinishTransactionRPC { req->SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX); } - NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const override { + NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const override { auto result = TEvRollbackTransactionRequest::AllocateResult(Request); result->set_status(status); + NYql::IssuesToMessage(issues, result->mutable_issues()); return result; } }; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index 00ef3782ce37..73f4c86398b9 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -125,6 +125,25 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) { } AFL_ENSURE(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender); + AFL_DEBUG(NKikimrServices::KQP_COMPUTE) + ("Recv TEvScanData from ShardID=", ev->Sender) + ("ScanId", ev->Get()->ScanId) + ("Finished", ev->Get()->Finished) + ("Lock", [&]() { + TStringBuilder builder; + for (const auto& lock : ev->Get()->LocksInfo.Locks) { + builder << lock.ShortDebugString(); + } + return builder; + }()) + ("BrokenLocks", [&]() { + TStringBuilder builder; + for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) { + builder << lock.ShortDebugString(); + } + return builder; + }()); + TInstant startTime = TActivationContext::Now(); if (ev->Get()->Finished) { state->State = EShardState::PostRunning; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 1bf3848752f0..50a8e1a8686a 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -462,10 +462,16 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); - LOG_D("Recv EvWriteResult from ShardID=" << shardId + LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() - << ", LocksCount= " << ev->Get()->Record.GetTxLocks().size() + << ", Locks= " << [&]() { + TStringBuilder builder; + for (const auto& lock : ev->Get()->Record.GetTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }() << ", Cookie=" << ev->Cookie << ", error=" << issues.ToString()); @@ -486,6 +492,18 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.DebugString()); + YQL_ENSURE(shardState->State == TShardState::EState::Preparing); + Counters->TxProxyMon->TxResultAborted->Inc(); + LocksBroken = true; + + YQL_ENSURE(!res->Record.GetTxLocks().empty()); + ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( + res->Record.GetTxLocks(0).GetSchemeShard(), + res->Record.GetTxLocks(0).GetPathId()); + ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); + } default: { return ShardError(res->Record); @@ -863,6 +881,7 @@ class TKqpDataExecuter : public TKqpExecuterBase(); ev->Record.SetCoordinatorID(TxCoordinator); @@ -1133,10 +1153,16 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); - LOG_D("Recv EvWriteResult from ShardID=" << shardId + LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() - << ", LocksCount= " << ev->Get()->Record.GetTxLocks().size() + << ", Locks= " << [&]() { + TStringBuilder builder; + for (const auto& lock : ev->Get()->Record.GetTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }() << ", Cookie=" << ev->Cookie << ", error=" << issues.ToString()); @@ -1167,16 +1193,11 @@ class TKqpDataExecuter : public TKqpExecuterBaseState = TShardState::EState::Finished; Counters->TxProxyMon->TxResultAborted->Inc(); LocksBroken = true; - - if (!res->Record.GetTxLocks().empty()) { - ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( - res->Record.GetTxLocks(0).GetSchemeShard(), - res->Record.GetTxLocks(0).GetPathId()); - return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); - } - - CheckExecutionComplete(); - return; + YQL_ENSURE(!res->Record.GetTxLocks().empty()); + ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( + res->Record.GetTxLocks(0).GetSchemeShard(), + res->Record.GetTxLocks(0).GetPathId()); + ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); } default: { @@ -1722,7 +1743,13 @@ class TKqpDataExecuter : public TKqpExecuterBase, public NYql::NDq return; } + CA_LOG_D("Recv TEvReadResult from ShardID=" << Reads[id].Shard->TabletId + << ", ReadId=" << id + << ", Status=" << Ydb::StatusIds::StatusCode_Name(record.GetStatus().GetCode()) + << ", Finished=" << record.GetFinished() + << ", RowCount=" << record.GetRowCount() + << ", TxLocks= " << [&]() { + TStringBuilder builder; + for (const auto& lock : record.GetTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }() + << ", BrokenTxLocks= " << [&]() { + TStringBuilder builder; + for (const auto& lock : record.GetBrokenTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }()); + if (!record.HasNodeId()) { Counters->ReadActorAbsentNodeId->Inc(); } else if (record.GetNodeId() != SelfId().NodeId()) { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 8daf7ff79b46..f2b737832356 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -277,8 +277,6 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedGet()->Record; - CA_LOG_D("TEvReadResult was received for table: " << StreamLookupWorker->GetTablePath() << - ", readId: " << record.GetReadId() << ", finished: " << record.GetFinished()); auto readIt = Reads.find(record.GetReadId()); if (readIt == Reads.end() || readIt->second.State != EReadState::Running) { @@ -288,6 +286,27 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedsecond; + CA_LOG_D("Recv TEvReadResult (stream lookup) from ShardID=" << read.ShardId + << ", Table = " << StreamLookupWorker->GetTablePath() + << ", ReadId=" << record.GetReadId() + << ", Status=" << Ydb::StatusIds::StatusCode_Name(record.GetStatus().GetCode()) + << ", Finished=" << record.GetFinished() + << ", RowCount=" << record.GetRowCount() + << ", TxLocks= " << [&]() { + TStringBuilder builder; + for (const auto& lock : record.GetTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }() + << ", BrokenTxLocks= " << [&]() { + TStringBuilder builder; + for (const auto& lock : record.GetBrokenTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }()); + for (auto& lock : record.GetBrokenTxLocks()) { BrokenLocks.push_back(lock); } diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index eda264583b89..e95d62c979d1 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -374,7 +374,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu CA_LOG_D("Recv EvWriteResult from ShardID=" << ev->Get()->Record.GetOrigin() << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() - << ", LocksCount= " << ev->Get()->Record.GetTxLocks().size() + << ", Locks= " << [&]() { + TStringBuilder builder; + for (const auto& lock : ev->Get()->Record.GetTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }() << ", Cookie=" << ev->Cookie); switch (ev->Get()->GetStatus()) { @@ -526,7 +532,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu CA_LOG_D("Got completed result TxId=" << ev->Get()->Record.GetTxId() << ", TabletId=" << ev->Get()->Record.GetOrigin() << ", Cookie=" << ev->Cookie - << ", LocksCount=" << ev->Get()->Record.GetTxLocks().size()); + << ", Locks=" << [&]() { + TStringBuilder builder; + for (const auto& lock : ev->Get()->Record.GetTxLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }()); OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie); @@ -625,7 +637,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", TxId=" << evWrite->Record.GetTxId() << ", TxMode=" << evWrite->Record.GetTxMode() << ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId() - << ", LocksCount= " << evWrite->Record.GetLocks().LocksSize() + << ", Locks= " << [&]() { + TStringBuilder builder; + for (const auto& lock : evWrite->Record.GetLocks().GetLocks()) { + builder << lock.ShortDebugString(); + } + return builder; + }() << ", Size=" << serializationResult.TotalDataSize << ", Cookie=" << metadata->Cookie << ", OperationsCount=" << metadata->OperationsCount << ", IsFinal=" << metadata->IsFinal << ", Attempts=" << metadata->SendAttempts); diff --git a/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp index 899ecc384176..e95f91df791a 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp @@ -43,10 +43,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); -// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, -// [] (const NYql::TIssue& issue) { -// return issue.GetMessage().Contains("/Root/Test"); -// })); + if (!GetIsOlap()) { + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); + } result = session2.ExecuteQuery(Q_(R"( SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; @@ -96,13 +98,13 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { auto commitResult = tx1->Commit().GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString()); commitResult.GetIssues().PrintTo(Cerr); - // TODO: - //UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - // [] (const NYql::TIssue& issue) { - // Y_UNUSED(issue); - // return issue.GetMessage().Contains("/Root/Test"); - // return true; - // }), commitResult.GetIssues().ToString()); + UNIT_ASSERT_C(commitResult.GetIssues().Size() != 0, commitResult.GetIssues().ToString()); + if (!GetIsOlap()) { + UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), commitResult.GetIssues().ToString()); + } result = session2.ExecuteQuery(Q_(R"( SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; @@ -196,10 +198,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); - //UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - // [] (const NYql::TIssue& issue) { - // return issue.GetMessage().Contains("/Root/Test"); - // })); + if (!GetIsOlap()) { + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); + } result = session1.ExecuteQuery(Q1_(R"( SELECT * FROM Test WHERE Group = 11; @@ -255,12 +259,12 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); -// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED)); - -// UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, -// [] (const NYql::TIssue& issue) { -// return issue.GetMessage().Contains("/Root/Test"); -// })); + if (!GetIsOlap()) { + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); + } result = session1.ExecuteQuery(Q1_(R"( SELECT * FROM Test WHERE Group = 11; diff --git a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp index d66293531ff4..a0ac82ad151d 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp @@ -227,7 +227,9 @@ Y_UNIT_TEST_SUITE(KqpSinkMvcc) { )"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); -// UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); + if (!GetIsOlap()) { + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); + } } }; diff --git a/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp index 36a994b9d16d..4959cf240e05 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp @@ -96,7 +96,7 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) { commitResult = tx.Commit().ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString()); - //TODO: UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND), commitResult.GetIssues().ToString()); + UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND), commitResult.GetIssues().ToString()); } }; @@ -145,8 +145,10 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) { UPDATE `/Root/KV` SET Value = "third" WHERE Key = 4; )"), TTxControl::Tx(tx->GetId())).ExtractValueSync(); if (GetIsOlap()) { + // Olap has Reads in this query, so it breaks now. UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); } else { + // Oltp doesn't have Reads in this query, so it breaks later. UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } @@ -155,7 +157,7 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) { if (GetIsOlap()) { UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString()); } else { - UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString()); } } }; @@ -186,14 +188,13 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) { auto result = session.ExecuteQuery(Q_(R"( INSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "New"); )"), TTxControl::Tx(tx.GetId())).ExtractValueSync(); - // result.GetIssues().PrintTo(Cerr); - //TODO: UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + result.GetIssues().PrintTo(Cerr); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); result = session.ExecuteQuery(Q_(R"( UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "New"); )"), TTxControl::Tx(tx.GetId())).ExtractValueSync(); - // result.GetIssues().PrintTo(Cerr); + result.GetIssues().PrintTo(Cerr); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::NOT_FOUND, result.GetIssues().ToString()); } };