diff --git a/ydb/core/persqueue/partition_scale_manager.cpp b/ydb/core/persqueue/partition_scale_manager.cpp index 7c83d53c2f0c..ef90ae9508e7 100644 --- a/ydb/core/persqueue/partition_scale_manager.cpp +++ b/ydb/core/persqueue/partition_scale_manager.cpp @@ -17,6 +17,8 @@ TPartitionScaleManager::TPartitionScaleManager( void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) { if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange " + << "need to split partition " << partition); PartitionsToSplit.emplace(partition.Id, partition); TrySendScaleRequest(ctx); } else { @@ -30,12 +32,14 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) { return; } - auto splitMergePair = BuildScaleRequest(); + auto splitMergePair = BuildScaleRequest(ctx); if (splitMergePair.first.empty() && splitMergePair.second.empty()) { return; } RequestInflight = true; + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange " + << "send split request"); CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest( TopicName, DatabasePath, @@ -51,7 +55,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) { using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit; using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge; -std::pair, std::vector> TPartitionScaleManager::BuildScaleRequest() { +std::pair, std::vector> TPartitionScaleManager::BuildScaleRequest(const TActorContext& ctx) { std::vector splitsToApply; std::vector mergesToApply; @@ -62,11 +66,15 @@ std::pair, std::vector> TPartition const auto& partition = itSplit->second; if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) { - auto mid = GetRangeMid(partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "", partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : ""); + auto from = partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : ""; + auto to = partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : ""; + auto mid = GetRangeMid(from, to); if (mid.empty()) { itSplit = PartitionsToSplit.erase(itSplit); + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId); continue; } + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << from << "'. To# '" << to << "'. Mid# '" << mid <<"'. Topic# " << TopicName << ". Partition# " << partitionId); TPartitionSplit split; split.set_partition(partition.Id); @@ -87,6 +95,7 @@ void TPartitionScaleManager::HandleScaleRequestResult(TPartitionScaleRequest::TE RequestInflight = false; LastResponseTime = ctx.Now(); auto result = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleRequestResult scale request result: " << result->Status << ". Topic# " << TopicName); if (result->Status == TEvTxUserProxy::TResultStatus::ExecComplete) { TrySendScaleRequest(ctx); } else { diff --git a/ydb/core/persqueue/partition_scale_manager.h b/ydb/core/persqueue/partition_scale_manager.h index 30d75ea39a47..8d76b1fe97ae 100644 --- a/ydb/core/persqueue/partition_scale_manager.h +++ b/ydb/core/persqueue/partition_scale_manager.h @@ -66,7 +66,7 @@ class TPartitionScaleManager { using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit; using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge; - std::pair, std::vector> BuildScaleRequest(); + std::pair, std::vector> BuildScaleRequest(const TActorContext& ctx); public: static const ui64 TRY_SCALE_REQUEST_WAKE_UP_TAG = 10; diff --git a/ydb/core/persqueue/partition_scale_request.cpp b/ydb/core/persqueue/partition_scale_request.cpp index 9bb1e013bac7..a24b263e2368 100644 --- a/ydb/core/persqueue/partition_scale_request.cpp +++ b/ydb/core/persqueue/partition_scale_request.cpp @@ -30,11 +30,11 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) { void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) { auto proposal = std::make_unique(); proposal->Record.SetDatabaseName(CanonizePath(DatabasePath)); - FillProposeRequest(*proposal, DatabasePath, Topic); + FillProposeRequest(ctx,*proposal, DatabasePath, Topic); ctx.Send(MakeTxProxyID(), proposal.release()); } -void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) { +void TPartitionScaleRequest::FillProposeRequest(const NActors::TActorContext &ctx, TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) { auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme(); modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup); modifyScheme.SetWorkingDir(workingDir); @@ -46,11 +46,14 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa NKikimrSchemeOp::TPersQueueGroupDescription groupDescription; groupDescription.SetName(topicName); - + TStringBuilder logMessage; + logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: "; for(const auto& split: Splits) { auto* newSplit = groupDescription.AddSplit(); + logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' "; *newSplit = split; } + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, logMessage); for(const auto& merge: Merges) { auto* newMerge = groupDescription.AddMerge(); @@ -98,7 +101,8 @@ void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus: for (auto& issue : ev->Get()->Record.GetIssues()) { issues << issue.ShortDebugString() + ", "; } - Cerr << "\n SAVDGB " << issues << "\n"; + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleRequest " + << "SchemaShard error when trying to execute a split request: " << issues); Send(ParentActorId, scaleRequestResult.release()); Die(ctx); } else { diff --git a/ydb/core/persqueue/partition_scale_request.h b/ydb/core/persqueue/partition_scale_request.h index 736b5196e35b..f1614d37d18d 100644 --- a/ydb/core/persqueue/partition_scale_request.h +++ b/ydb/core/persqueue/partition_scale_request.h @@ -12,7 +12,7 @@ namespace NKikimr { namespace NPQ { - + class TPartitionScaleRequest: public NActors::TActorBootstrapped { using TBase = NActors::TActorBootstrapped; @@ -48,7 +48,7 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped SplitPath(const TString& path); void SendProposeRequest(const NActors::TActorContext &ctx); - void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName); + void FillProposeRequest(const NActors::TActorContext &ctx, TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName); private: const TString Topic; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 1295038e1e51..285e61c005d6 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -541,17 +541,31 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { ProcessTimestampsForNewData(prevEndOffset, ctx); } -NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) { +NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) { auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed; - + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "TPartition::CheckScaleStatus writeSpeedUsagePercent# " << writeSpeedUsagePercent << " Topic: \"" << TopicName() << "\"." << + " Partition: " << Partition + ); auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT || Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE; - + auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE; if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) { + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"." << + " Partition: " << Partition + ); return NKikimrPQ::EScaleStatus::NEED_SPLIT; } else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) { + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "TPartition::CheckScaleStatus NEED_MERGE" << " Topic: \"" << TopicName() << "\"." << + " Partition: " << Partition + ); return NKikimrPQ::EScaleStatus::NEED_MERGE; } return NKikimrPQ::EScaleStatus::NORMAL; diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 5ad1f4c6931a..11785ef95b69 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -511,7 +511,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0); partitionsInfo[p.GetPartition()] = {p.GetTabletId(), {}}; - if (SplitMergeEnabled(TabletConfig)) { + if (SplitMergeEnabled(TabletConfig) && p.HasKeyRange()) { partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange()); } diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp index 896daa44278e..2ddf2bb96e44 100644 --- a/ydb/core/persqueue/ut/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp @@ -624,7 +624,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { a = "a"; b = {}; res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b); - Cerr << "\n SAVDBG " << res << "\n"; UNIT_ASSERT(a < res); UNIT_ASSERT(b != res);