diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 0a7c0ff0d000..782796b1ed0f 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -815,7 +815,7 @@ void TPersQueue::ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult& switch (read.GetStatus()) { case NKikimrProto::OK: { - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " has a tx info"); + PQ_LOG_D("has a tx info"); NKikimrPQ::TTabletTxInfo info; Y_ABORT_UNLESS(info.ParseFromString(read.GetValue())); @@ -825,7 +825,7 @@ void TPersQueue::ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult& break; } case NKikimrProto::NODATA: { - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " doesn't have tx info"); + PQ_LOG_D("doesn't have tx info"); InitPlanStep(); @@ -1027,7 +1027,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& Y_ABORT_UNLESS(TopicName.size(), "Need topic name here"); ctx.Send(CacheActor, new TEvPQ::TEvChangeCacheConfig(TopicName, cacheSize)); } else if (read.GetStatus() == NKikimrProto::NODATA) { - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " no config, start with empty partitions and default config"); + PQ_LOG_D("no config, start with empty partitions and default config"); } else { PQ_LOG_ERROR_AND_DIE("Unexpected config read status: " << read.GetStatus()); return; @@ -1412,6 +1412,9 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c void TPersQueue::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPQ::TEvError" << + " Cookie " << ev->Get()->Cookie << + ", Error " << ev->Get()->Error); auto it = ResponseProxy.find(ev->Get()->Cookie); if (it == ResponseProxy.end()) @@ -1447,6 +1450,7 @@ void TPersQueue::FinishResponse(THashMap>::iter void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPersQueue::TEvUpdateConfig"); if (!ConfigInited) { UpdateConfigRequests.emplace_back(ev->Release(), ev->Sender); return; @@ -1457,6 +1461,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorCon void TPersQueue::Handle(TEvPQ::TEvPartitionConfigChanged::TPtr&, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPQ::TEvPartitionConfigChanged"); + Y_ABORT_UNLESS(ChangePartitionConfigInflight > 0); --ChangePartitionConfigInflight; @@ -1698,6 +1704,8 @@ void TPersQueue::ClearNewConfig() void TPersQueue::Handle(TEvPersQueue::TEvDropTablet::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPersQueue::TEvDropTablet"); + auto& record = ev->Get()->Record; ui64 txId = record.GetTxId(); @@ -2837,6 +2845,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& void TPersQueue::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvTabletPipe::TEvServerConnected"); + auto it = PipesInfo.insert({ev->Get()->ClientId, {}}).first; it->second.ServerActors++; LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " server connected, pipe " @@ -2848,6 +2858,8 @@ void TPersQueue::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActo void TPersQueue::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvTabletPipe::TEvServerDisconnected"); + //inform partition if needed; auto it = PipesInfo.find(ev->Get()->ClientId); if (it != PipesInfo.end()) { @@ -2871,6 +2883,8 @@ void TPersQueue::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TA void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvTabletPipe::TEvClientConnected"); + Y_ABORT_UNLESS(ev->Get()->Leader, "Unexpectedly connected to follower of tablet %" PRIu64, ev->Get()->TabletId); if (PipeClientCache->OnConnect(ev)) { @@ -2885,6 +2899,8 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvTabletPipe::TEvClientDestroyed"); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Client pipe to tablet " << ev->Get()->TabletId << " is reset"); @@ -3032,14 +3048,15 @@ void TPersQueue::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, MediatorTimeCastEntry = message->Entry; Y_ABORT_UNLESS(MediatorTimeCastEntry); - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << - "Registered with mediator time cast"); + PQ_LOG_D("Registered with mediator time cast"); TryWriteTxs(ctx); } void TPersQueue::Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvInterconnect::TEvNodeInfo"); + Y_ABORT_UNLESS(ev->Get()->Node); DCId = ev->Get()->Node->Location.GetDataCenterId(); ResourceMetrics = Executor()->GetResourceMetrics(); @@ -3098,7 +3115,7 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx) void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPersQueue::TEvCancelTransactionProposal"); + PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal"); NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record; Y_ABORT_UNLESS(event.HasTxId()); @@ -3114,7 +3131,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.DebugString()); + PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.ShortDebugString()); NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record; switch (event.GetTxBodyCase()) { @@ -3267,14 +3284,7 @@ void TPersQueue::HandleConfigTransaction(TAutoPtrGet()->Record; - - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, - "Tablet: " << TabletID() << - ", PlanStep: " << event.GetStep() << - ", Mediator: " << event.GetMediatorID()); + PQ_LOG_D("Handle TEvTxProcessing::TEvPlanStep " << ev->Get()->Record.ShortDebugString()); EvPlanStepQueue.emplace_back(ev->Sender, ev->Release().Release()); @@ -3283,7 +3293,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCont void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvTxProcessing::TEvReadSet"); + PQ_LOG_D("Handle TEvTxProcessing::TEvReadSet " << ev->Get()->Record.ShortDebugString()); NKikimrTx::TEvReadSet& event = ev->Get()->Record; Y_ABORT_UNLESS(event.HasTxId()); @@ -3293,7 +3303,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte ack = std::make_unique(*ev->Get(), TabletID()); } - if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->Senders.contains(event.GetTabletProducer())) { + if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->PredicatesReceived.contains(event.GetTabletProducer())) { tx->OnReadSet(event, ev->Sender, std::move(ack)); if (tx->State == NKikimrPQ::TTransaction::WAIT_RS) { @@ -3302,6 +3312,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte TryWriteTxs(ctx); } } else if (ack) { + PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer()); // // для неизвестных транзакций подтверждение отправляется сразу // @@ -3311,7 +3322,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte void TPersQueue::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvTxProcessing::TEvReadSetAck"); + PQ_LOG_D("Handle TEvTxProcessing::TEvReadSetAck " << ev->Get()->Record.ShortDebugString()); NKikimrTx::TEvReadSetAck& event = ev->Get()->Record; Y_ABORT_UNLESS(event.HasTxId()); @@ -3335,13 +3346,11 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC { const TEvPQ::TEvTxCalcPredicateResult& event = *ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, - "Tablet " << TabletID() << - " Handle TEvPQ::TEvTxCalcPredicateResult" << - " Step " << event.Step << - " TxId " << event.TxId << - " Partition " << event.Partition << - " Predicate " << (event.Predicate ? "true" : "false")); + PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicateResult" << + " Step " << event.Step << + ", TxId " << event.TxId << + ", Partition " << event.Partition << + ", Predicate " << (event.Predicate ? "true" : "false")); auto tx = GetTransaction(ctx, event.TxId); if (!tx) { @@ -3359,6 +3368,11 @@ void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const { const TEvPQ::TEvProposePartitionConfigResult& event = *ev->Get(); + PQ_LOG_D("Handle TEvPQ::TEvProposePartitionConfigResult" << + " Step " << event.Step << + ", TxId " << event.TxId << + ", Partition " << event.Partition); + auto tx = GetTransaction(ctx, event.TxId); if (!tx) { return; @@ -3375,10 +3389,13 @@ void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPQ::TEvTxCommitDone"); - const TEvPQ::TEvTxCommitDone& event = *ev->Get(); + PQ_LOG_D("Handle TEvPQ::TEvTxCommitDone" << + " Step " << event.Step << + ", TxId " << event.TxId << + ", Partition " << event.Partition); + auto tx = GetTransaction(ctx, event.TxId); if (!tx) { return; @@ -3475,6 +3492,7 @@ void TPersQueue::BeginWriteTxs(const TActorContext& ctx) PendingSupportivePartitions = std::move(NewSupportivePartitions); NewSupportivePartitions.clear(); + PQ_LOG_D("Send TEvKeyValue::TEvRequest (WRITE_TX_COOKIE)"); ctx.Send(ctx.SelfID, request.Release()); TryReturnTabletStateAll(ctx); @@ -3665,7 +3683,7 @@ void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx, tx->AddCmdDelete(request); - Txs.erase(tx->TxId); + ChangedTxs.insert(tx->TxId); } DeleteTxs.clear(); @@ -3773,8 +3791,8 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, TString body; Y_ABORT_UNLESS(data.SerializeToString(&body)); - PQ_LOG_D("Send TEvTxProcessing::TEvReadSet to " << tx.Receivers.size() << " receivers. Wait TEvTxProcessing::TEvReadSet from " << tx.Senders.size() << " senders."); - for (ui64 receiverId : tx.Receivers) { + PQ_LOG_D("Send TEvTxProcessing::TEvReadSet to " << tx.PredicateRecipients.size() << " receivers. Wait TEvTxProcessing::TEvReadSet from " << tx.PredicatesReceived.size() << " senders."); + for (auto& [receiverId, _] : tx.PredicateRecipients) { if (receiverId != TabletID()) { auto event = std::make_unique(tx.Step, tx.TxId, @@ -3783,6 +3801,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, TabletID(), body, 0); + PQ_LOG_D("Send TEvReadSet to tablet " << receiverId); SendToPipe(receiverId, tx, std::move(event), ctx); } } @@ -3792,6 +3811,7 @@ void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx, TDistributedTransaction& tx) { for (auto& [target, event] : tx.ReadSetAcks) { + PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString()); ctx.Send(target, event.release()); } } @@ -3969,11 +3989,10 @@ const THashSet& TPersQueue::GetBindedTxs(ui64 tabletId) TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx, ui64 txId) { + Y_UNUSED(ctx); auto p = Txs.find(txId); if (p == Txs.end()) { - LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, - "Tablet " << TabletID() << - " Unknown transaction " << txId); + PQ_LOG_W("Unknown transaction " << txId); return nullptr; } return &p->second; @@ -3982,24 +4001,25 @@ TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx, void TPersQueue::CheckTxState(const TActorContext& ctx, TDistributedTransaction& tx) { + PQ_LOG_D("TxId " << tx.TxId << + ", State " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + switch (tx.State) { case NKikimrPQ::TTransaction::UNKNOWN: Y_ABORT_UNLESS(tx.TxId != Max()); - PQ_LOG_T("TxId="<< tx.TxId << ", State=UNKNOWN"); - WriteTx(tx, NKikimrPQ::TTransaction::PREPARED); ScheduleProposeTransactionResult(tx); tx.State = NKikimrPQ::TTransaction::PREPARING; + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); break; case NKikimrPQ::TTransaction::PREPARING: Y_ABORT_UNLESS(tx.WriteInProgress); - PQ_LOG_T("TxId="<< tx.TxId << ", State=PREPARING"); - tx.WriteInProgress = false; // @@ -4007,25 +4027,25 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, // tx.State = NKikimrPQ::TTransaction::PREPARED; + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); break; case NKikimrPQ::TTransaction::PREPARED: Y_ABORT_UNLESS(tx.Step != Max()); - PQ_LOG_T("TxId="<< tx.TxId << ", State=PREPARED"); - WriteTx(tx, NKikimrPQ::TTransaction::PLANNED); tx.State = NKikimrPQ::TTransaction::PLANNING; + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); break; case NKikimrPQ::TTransaction::PLANNING: Y_ABORT_UNLESS(tx.WriteInProgress); - PQ_LOG_T("TxId="<< tx.TxId << ", State=PLANNING"); - tx.WriteInProgress = false; // @@ -4033,16 +4053,17 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, // tx.State = NKikimrPQ::TTransaction::PLANNED; + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); [[fallthrough]]; case NKikimrPQ::TTransaction::PLANNED: - PQ_LOG_T("TxId="<< tx.TxId << ", State=PLANNED" << - ", (!TxQueue.empty())=" << !TxQueue.empty()); + PQ_LOG_D("TxQueue.size " << TxQueue.size()); if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { std::tie(ExecStep, ExecTxId) = TxQueue.front(); - PQ_LOG_D("ExecStep " << ExecStep << ", ExecTxId " << ExecTxId); + PQ_LOG_D("New ExecStep " << ExecStep << ", ExecTxId " << ExecTxId); switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: @@ -4065,6 +4086,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, } tx.State = NKikimrPQ::TTransaction::CALCULATING; + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); } break; @@ -4072,26 +4095,19 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::CALCULATING: Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected); - PQ_LOG_T("TxId="<< tx.TxId << ", State=CALCULATING" << - ", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount << - ", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected); + PQ_LOG_D("Received " << tx.PartitionRepliesCount << + ", Expected " << tx.PartitionRepliesExpected); if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: - SendEvReadSetToReceivers(ctx, tx); - + case NKikimrPQ::TTransaction::KIND_CONFIG: WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS); tx.State = NKikimrPQ::TTransaction::CALCULATED; - break; - - case NKikimrPQ::TTransaction::KIND_CONFIG: - SendEvReadSetToReceivers(ctx, tx); + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); - tx.State = NKikimrPQ::TTransaction::WAIT_RS; - - CheckTxState(ctx, tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: @@ -4104,11 +4120,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::CALCULATED: Y_ABORT_UNLESS(tx.WriteInProgress); - PQ_LOG_T("TxId="<< tx.TxId << ", State=CALCULATED"); - tx.WriteInProgress = false; tx.State = NKikimrPQ::TTransaction::WAIT_RS; + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); [[fallthrough]]; @@ -4117,10 +4133,13 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, // the number of TEvReadSetAck sent should not be greater than the number of senders // from TEvProposeTransaction // - Y_ABORT_UNLESS(tx.ReadSetAcks.size() <= tx.Senders.size()); + Y_ABORT_UNLESS(tx.ReadSetAcks.size() <= tx.PredicatesReceived.size(), + "tx.ReadSetAcks.size=%" PRISZT ", tx.PredicatesReceived.size=%" PRISZT, + tx.ReadSetAcks.size(), tx.PredicatesReceived.size()); - PQ_LOG_T("TxId="<< tx.TxId << ", State=WAIT_RS" << - ", tx.HaveParticipantsDecision()=" << tx.HaveParticipantsDecision()); + SendEvReadSetToReceivers(ctx, tx); + + PQ_LOG_D("HaveParticipantsDecision " << tx.HaveParticipantsDecision()); if (tx.HaveParticipantsDecision()) { if (tx.GetDecision() == NKikimrTx::TReadSetData::DECISION_COMMIT) { @@ -4130,6 +4149,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, } tx.State = NKikimrPQ::TTransaction::EXECUTING; + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); } else { break; } @@ -4139,9 +4160,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::EXECUTING: Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected); - PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTING" << - ", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount << - ", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected); + PQ_LOG_D("Received " << tx.PartitionRepliesCount << + ", Expected " << tx.PartitionRepliesExpected); + if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { Y_ABORT_UNLESS(!TxQueue.empty()); Y_ABORT_UNLESS(TxQueue.front().second == tx.TxId); @@ -4150,10 +4171,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: - SendEvReadSetAckToSenders(ctx, tx); break; case NKikimrPQ::TTransaction::KIND_CONFIG: - SendEvReadSetAckToSenders(ctx, tx); ApplyNewConfig(tx.TabletConfig, ctx); TabletConfigTx = tx.TabletConfig; BootstrapConfigTx = tx.BootstrapConfig; @@ -4163,9 +4182,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, } tx.State = NKikimrPQ::TTransaction::EXECUTED; - - TxQueue.pop(); - TryStartTransaction(ctx); + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); } else { break; } @@ -4173,7 +4191,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, [[fallthrough]]; case NKikimrPQ::TTransaction::EXECUTED: - PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTED, tx.HaveAllRecipientsReceive()=" << tx.HaveAllRecipientsReceive()); + PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive()); if (tx.HaveAllRecipientsReceive()) { if (tx.WriteId.Defined()) { BeginDeleteTx(tx); @@ -4183,6 +4201,19 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, } break; + + case NKikimrPQ::TTransaction::DELETING: + // The PQ tablet has persisted its state. Now she can delete the transaction and take the next one. + SendEvReadSetAckToSenders(ctx, tx); + if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { + TxQueue.pop(); + TryStartTransaction(ctx); + } + Txs.erase(tx.TxId); + // If this was the last transaction, then you need to send responses to messages about changes + // in the status of the PQ tablet (if they came) + TryReturnTabletStateAll(ctx); + break; } } @@ -4195,8 +4226,14 @@ void TPersQueue::WriteTx(TDistributedTransaction& tx, NKikimrPQ::TTransaction::E void TPersQueue::DeleteTx(TDistributedTransaction& tx) { + PQ_LOG_D("add an TxId " << tx.TxId << " to the list for deletion"); + DeleteTxs.insert(tx.TxId); + tx.State = NKikimrPQ::TTransaction::DELETING; + PQ_LOG_D("TxId " << tx.TxId << + ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); + tx.WriteInProgress = true; } @@ -4378,7 +4415,6 @@ void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadR TxQueue.clear(); std::deque> plannedTxs; - const auto& ctx = ActorContext(); for (size_t i = 0; i < readRange.PairSize(); ++i) { auto& pair = readRange.GetPair(i); @@ -4386,8 +4422,7 @@ void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadR NKikimrPQ::TTransaction tx; Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue())); - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " " << - "Tx: " << tx.DebugString()); + PQ_LOG_D("Load tx " << tx.ShortDebugString()); Txs.emplace(tx.GetTxId(), tx); @@ -4404,8 +4439,7 @@ void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadR } if (!TxQueue.empty()) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " " << - "top tx queue (" << TxQueue.front().first << ", " << TxQueue.front().second << ")"); + PQ_LOG_D("top tx queue (" << TxQueue.front().first << ", " << TxQueue.front().second << ")"); } Y_UNUSED(partitionTxs); @@ -4414,6 +4448,7 @@ void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadR void TPersQueue::TryStartTransaction(const TActorContext& ctx) { if (TxQueue.empty()) { + PQ_LOG_D("empty tx queue"); return; } @@ -4463,6 +4498,8 @@ void TPersQueue::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx) { + PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransactionAttach " << ev->Get()->Record.ShortDebugString()); + const ui64 txId = ev->Get()->Record.GetTxId(); NKikimrProto::EReplyStatus status = NKikimrProto::NODATA; @@ -4518,13 +4555,16 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti } } -void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext&) +void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvLongTxService::TEvLockStatus " << ev->Get()->Record.ShortDebugString()); + auto& record = ev->Get()->Record; const TWriteId writeId(record.GetLockNode(), record.GetLockId()); if (!TxWrites.contains(writeId)) { // the transaction has already been completed + PQ_LOG_D("unknown WriteId " << writeId); return; } @@ -4532,12 +4572,25 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e writeInfo.LongTxSubscriptionStatus = record.GetStatus(); if (writeInfo.LongTxSubscriptionStatus == NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) { + PQ_LOG_D("subscribed WriteId " << writeId); return; } + PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << writeInfo.TxId); + if (!writeInfo.TxId.Defined()) { + PQ_LOG_D("delete write info for WriteId " << writeId); // the message TEvProposeTransaction will not come anymore BeginDeletePartitions(writeInfo); + return; + } + + ui64 txId = *writeInfo.TxId; + PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << txId); + + auto* tx = GetTransaction(ctx, txId); + if (!tx || (tx->State == NKikimrPQ::TTransaction::EXECUTED)) { + BeginDeletePartitions(writeInfo); } } @@ -4557,6 +4610,8 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const T void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPQ::TEvDeletePartitionDone " << ev->Get()->PartitionId); + auto* event = ev->Get(); Y_ABORT_UNLESS(event->PartitionId.WriteId.Defined()); const TWriteId& writeId = *event->PartitionId.WriteId; @@ -4605,13 +4660,16 @@ void TPersQueue::BeginDeleteTx(const TDistributedTransaction& tx) { Y_ABORT_UNLESS(tx.WriteId.Defined()); const TWriteId& writeId = *tx.WriteId; + PQ_LOG_D("begin delete write info for WriteId " << writeId); if (!TxWrites.contains(writeId)) { // the transaction has already been completed + PQ_LOG_D("unknown WriteId " << writeId); return; } TTxWriteInfo& writeInfo = TxWrites.at(writeId); if (writeInfo.LongTxSubscriptionStatus == NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) { + PQ_LOG_D("wait for WriteId subscription status"); return; } @@ -4621,11 +4679,13 @@ void TPersQueue::BeginDeleteTx(const TDistributedTransaction& tx) void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo) { if (writeInfo.Deleting) { + PQ_LOG_D("Already deleting WriteInfo"); return; } for (auto& [_, partitionId] : writeInfo.Partitions) { Y_ABORT_UNLESS(Partitions.contains(partitionId)); const TPartitionInfo& partition = Partitions.at(partitionId); + PQ_LOG_D("send TEvPQ::TEvDeletePartition to partition " << partitionId); Send(partition.Actor, new TEvPQ::TEvDeletePartition); } writeInfo.Deleting = true; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 50fc8e21bfc6..be8775389307 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -16,11 +16,29 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& MinStep = tx.GetMinStep(); MaxStep = tx.GetMaxStep(); - for (ui64 tabletId : tx.GetSenders()) { - Senders.insert(tabletId); + ReadSetCount = 0; + + for (auto& p : tx.GetPredicatesReceived()) { + PredicatesReceived[p.GetTabletId()] = p; + + if (p.HasPredicate()) { + SetDecision(ParticipantsDecision, + p.GetPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT); + ++ReadSetCount; + } + } + + PredicateAcksCount = 0; + + for (ui64 tabletId : tx.GetPredicateRecipients()) { + PredicateRecipients[tabletId] = false; } - for (ui64 tabletId : tx.GetReceivers()) { - Receivers.insert(tabletId); + + if (tx.HasPredicate()) { + SelfDecision = + tx.GetPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT; + } else { + SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; } switch (Kind) { @@ -34,15 +52,6 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& Y_FAIL_S("unknown transaction type"); } - if (tx.HasSelfPredicate()) { - SelfDecision = - tx.GetSelfPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT; - } - if (tx.HasAggrPredicate()) { - ParticipantsDecision = - tx.GetAggrPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT; - } - Y_ABORT_UNLESS(tx.HasSourceActor()); SourceActor = ActorIdFromProto(tx.GetSourceActor()); @@ -115,6 +124,11 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr Y_FAIL_S("unknown TxBody case"); } + PartitionRepliesCount = 0; + PartitionRepliesExpected = 0; + + ReadSetCount = 0; + Y_ABORT_UNLESS(event.HasSourceActor()); SourceActor = ActorIdFromProto(event.GetSourceActor()); } @@ -124,15 +138,15 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac { Kind = NKikimrPQ::TTransaction::KIND_DATA; - for (ui64 tablet : txBody.GetSendingShards()) { - if (tablet != extractTabletId) { - Senders.insert(tablet); + for (ui64 tabletId : txBody.GetSendingShards()) { + if (tabletId != extractTabletId) { + PredicatesReceived[tabletId].SetTabletId(tabletId); } } - for (ui64 tablet : txBody.GetReceivingShards()) { - if (tablet != extractTabletId) { - Receivers.insert(tablet); + for (ui64 tabletId : txBody.GetReceivingShards()) { + if (tabletId != extractTabletId) { + PredicateRecipients[tabletId] = false; } } @@ -143,11 +157,6 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac } else { WriteId = Nothing(); } - - PartitionRepliesCount = 0; - PartitionRepliesExpected = 0; - - ReadSetCount = 0; } void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody, @@ -172,7 +181,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans if (node->Children.empty()) { for (const auto* r : node->Parents) { if (extractTabletId != r->TabletId) { - Senders.insert(r->TabletId); + PredicatesReceived[r->TabletId].SetTabletId(r->TabletId); } } } @@ -180,18 +189,13 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans for (const auto* r : node->Children) { if (r->Children.empty()) { if (extractTabletId != r->TabletId) { - Receivers.insert(r->TabletId); + PredicateRecipients[r->TabletId] = false; } } } } InitPartitions(); - - PartitionRepliesCount = 0; - PartitionRepliesExpected = 0; - - ReadSetCount = 0; } void TDistributedTransaction::OnPlanStep(ui64 step) @@ -234,14 +238,18 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event, Y_ABORT_UNLESS((Step == Max()) || (event.HasStep() && (Step == event.GetStep()))); Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId())); - if (Senders.contains(event.GetTabletProducer())) { + if (PredicatesReceived.contains(event.GetTabletProducer())) { NKikimrTx::TReadSetData data; Y_ABORT_UNLESS(event.HasReadSet() && data.ParseFromString(event.GetReadSet())); SetDecision(ParticipantsDecision, data.GetDecision()); ReadSetAcks[sender] = std::move(ack); - ++ReadSetCount; + auto& p = PredicatesReceived[event.GetTabletProducer()]; + if (!p.HasPredicate()) { + p.SetPredicate(data.GetDecision() == NKikimrTx::TReadSetData::DECISION_COMMIT); + ++ReadSetCount; + } } else { Y_DEBUG_ABORT("unknown sender tablet %" PRIu64, event.GetTabletProducer()); } @@ -252,7 +260,10 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event Y_ABORT_UNLESS(event.HasStep() && (Step == event.GetStep())); Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId())); - Receivers.erase(event.GetTabletConsumer()); + if (PredicateRecipients.contains(event.GetTabletConsumer())) { + PredicateRecipients[event.GetTabletConsumer()] = true; + ++PredicateAcksCount; + } } void TDistributedTransaction::OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event) @@ -271,7 +282,7 @@ auto TDistributedTransaction::GetDecision() const -> EDecision constexpr EDecision abort = NKikimrTx::TReadSetData::DECISION_ABORT; constexpr EDecision unknown = NKikimrTx::TReadSetData::DECISION_UNKNOWN; - EDecision aggrDecision = Senders.empty() ? commit : ParticipantsDecision; + const EDecision aggrDecision = PredicatesReceived.empty() ? commit : ParticipantsDecision; if ((SelfDecision == commit) && (aggrDecision == commit)) { return commit; @@ -286,14 +297,14 @@ auto TDistributedTransaction::GetDecision() const -> EDecision bool TDistributedTransaction::HaveParticipantsDecision() const { return - (Senders.size() == ReadSetCount) && + (PredicatesReceived.size() == ReadSetCount) && (ParticipantsDecision != NKikimrTx::TReadSetData::DECISION_UNKNOWN) || - Senders.empty(); + PredicatesReceived.empty(); } bool TDistributedTransaction::HaveAllRecipientsReceive() const { - return Receivers.empty(); + return PredicateRecipients.size() == PredicateAcksCount; } void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& request, @@ -321,6 +332,18 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque Y_FAIL_S("unknown transaction type"); } + tx.MutableOperations()->Add(Operations.begin(), Operations.end()); + if (SelfDecision != NKikimrTx::TReadSetData::DECISION_UNKNOWN) { + tx.SetPredicate(SelfDecision == NKikimrTx::TReadSetData::DECISION_COMMIT); + } + + for (auto& [_, predicate] : PredicatesReceived) { + *tx.AddPredicatesReceived() = predicate; + } + for (auto& [tabletId, _] : PredicateRecipients) { + tx.AddPredicateRecipients(tabletId); + } + Y_ABORT_UNLESS(SourceActor != TActorId()); ActorIdToProto(SourceActor, tx.MutableSourceActor()); @@ -336,19 +359,6 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx) { - for (ui64 tabletId : Senders) { - tx.AddSenders(tabletId); - } - for (ui64 tabletId : Receivers) { - tx.AddReceivers(tabletId); - } - tx.MutableOperations()->Add(Operations.begin(), Operations.end()); - if (SelfDecision != NKikimrTx::TReadSetData::DECISION_UNKNOWN) { - tx.SetSelfPredicate(SelfDecision == NKikimrTx::TReadSetData::DECISION_COMMIT); - } - if (ParticipantsDecision != NKikimrTx::TReadSetData::DECISION_UNKNOWN) { - tx.SetAggrPredicate(ParticipantsDecision == NKikimrTx::TReadSetData::DECISION_COMMIT); - } if (WriteId.Defined()) { SetWriteId(tx, *WriteId); } @@ -368,6 +378,8 @@ void TDistributedTransaction::AddCmdDelete(NKikimrClient::TKeyValueRequest& requ range->SetIncludeFrom(true); range->SetTo(key); range->SetIncludeTo(true); + + PQ_LOG_D("add CmdDeleteRange for key " << key); } void TDistributedTransaction::SetDecision(NKikimrTx::TReadSetData::EDecision& var, NKikimrTx::TReadSetData::EDecision value) diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 151dac913233..a82e1fcaf6ed 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -46,8 +46,8 @@ struct TDistributedTransaction { EState State = NKikimrPQ::TTransaction::UNKNOWN; ui64 MinStep = Max(); ui64 MaxStep = Max(); - THashSet Senders; // список отправителей TEvReadSet - THashSet Receivers; // список получателей TEvReadSet + THashMap PredicatesReceived; + THashMap PredicateRecipients; TVector Operations; TMaybe WriteId; @@ -111,6 +111,7 @@ struct TDistributedTransaction { const TVector& GetBindedMsgs(ui64 tabletId); bool HasWriteOperations = false; + size_t PredicateAcksCount = 0; }; } diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 6159a8780b0c..a35fe61765ba 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -797,8 +797,8 @@ Y_UNIT_TEST_F(Multiple_PQTablets, TPQTabletFixture) WaitProposeTransactionResponse({.TxId=txId_2, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); - WaitReadSetAck(*tablet, {.Step=100, .TxId=txId_2, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); tablet->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId}); + WaitReadSetAck(*tablet, {.Step=100, .TxId=txId_2, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); // // TODO(abcdef): проверить, что удалена информация о транзакции @@ -872,8 +872,8 @@ Y_UNIT_TEST_F(PQTablet_Send_RS_With_Abort, TPQTabletFixture) WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::ABORTED}); - WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); tablet->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId, .Source=Ctx->TabletId}); + WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); // // TODO(abcdef): проверить, что удалена информация о транзакции @@ -911,8 +911,8 @@ Y_UNIT_TEST_F(Partition_Send_Predicate_With_False, TPQTabletFixture) WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::ABORTED}); - WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); tablet->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId, .Source=Ctx->TabletId}); + WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); // // TODO(abcdef): проверить, что удалена информация о транзакции diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 9c7be1e9d6f1..34f5ee3505b6 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -1084,16 +1084,22 @@ message TTransaction { enum EState { UNKNOWN = 0; PREPARING = 1; - PREPARED = 2; // хранится + PREPARED = 2; // persist PLANNING = 3; - PLANNED = 4; // хранится. хотим выкинуть и не персистить + PLANNED = 4; // persist CALCULATING = 5; CALCULATED = 6; - WAIT_RS = 7; // хранится + WAIT_RS = 7; // persist EXECUTING = 8; EXECUTED = 9; + DELETING = 10; }; + message TPredicateReceived { + optional uint64 TabletId = 1; + optional bool Predicate = 2; + } + optional EKind Kind = 11; optional uint64 Step = 8; optional uint64 TxId = 1; @@ -1101,14 +1107,14 @@ message TTransaction { optional uint64 MinStep = 3; optional uint64 MaxStep = 4; + repeated TPredicateReceived PredicatesReceived = 5; + repeated uint64 PredicateRecipients = 6; + optional bool Predicate = 9; + // // TDataTransaction // - repeated uint64 Senders = 5; - repeated uint64 Receivers = 6; repeated TPartitionOperation Operations = 7; - optional bool SelfPredicate = 9; // только предикаты партиций. предикаты коллег отдельно - optional bool AggrPredicate = 10; // // TConfigTransaction