Skip to content

Commit

Permalink
autoscaling logs
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed May 29, 2024
1 parent bc3a6f1 commit 732b4cc
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 15 deletions.
15 changes: 12 additions & 3 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ TPartitionScaleManager::TPartitionScaleManager(

void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange "
<< "need to split partition " << partition);
PartitionsToSplit.emplace(partition.Id, partition);
TrySendScaleRequest(ctx);
} else {
Expand All @@ -30,12 +32,14 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
return;
}

auto splitMergePair = BuildScaleRequest();
auto splitMergePair = BuildScaleRequest(ctx);
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
return;
}

RequestInflight = true;
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange "
<< "send split request");
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
TopicName,
DatabasePath,
Expand All @@ -51,7 +55,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest() {
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest(const TActorContext& ctx) {
std::vector<TPartitionSplit> splitsToApply;
std::vector<TPartitionMerge> mergesToApply;

Expand All @@ -62,11 +66,15 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
const auto& partition = itSplit->second;

if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) {
auto mid = GetRangeMid(partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "", partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "");
auto from = partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "";
auto to = partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "";
auto mid = GetRangeMid(from, to);
if (mid.empty()) {
itSplit = PartitionsToSplit.erase(itSplit);
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId);
continue;
}
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << from << "'. To# '" << to << "'. Mid# '" << mid <<"'. Topic# " << TopicName << ". Partition# " << partitionId);

TPartitionSplit split;
split.set_partition(partition.Id);
Expand All @@ -87,6 +95,7 @@ void TPartitionScaleManager::HandleScaleRequestResult(TPartitionScaleRequest::TE
RequestInflight = false;
LastResponseTime = ctx.Now();
auto result = ev->Get();
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleRequestResult scale request result: " << result->Status << ". Topic# " << TopicName);
if (result->Status == TEvTxUserProxy::TResultStatus::ExecComplete) {
TrySendScaleRequest(ctx);
} else {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TPartitionScaleManager {
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> BuildScaleRequest();
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> BuildScaleRequest(const TActorContext& ctx);

public:
static const ui64 TRY_SCALE_REQUEST_WAKE_UP_TAG = 10;
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/persqueue/partition_scale_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) {
auto proposal = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
proposal->Record.SetDatabaseName(CanonizePath(DatabasePath));
FillProposeRequest(*proposal, DatabasePath, Topic);
FillProposeRequest(ctx,*proposal, DatabasePath, Topic);
ctx.Send(MakeTxProxyID(), proposal.release());
}

void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) {
void TPartitionScaleRequest::FillProposeRequest(const NActors::TActorContext &ctx, TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) {
auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup);
modifyScheme.SetWorkingDir(workingDir);
Expand All @@ -46,11 +46,14 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa

NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
groupDescription.SetName(topicName);

TStringBuilder logMessage;
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: ";
for(const auto& split: Splits) {
auto* newSplit = groupDescription.AddSplit();
logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' ";
*newSplit = split;
}
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, logMessage);

for(const auto& merge: Merges) {
auto* newMerge = groupDescription.AddMerge();
Expand Down Expand Up @@ -98,7 +101,8 @@ void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus:
for (auto& issue : ev->Get()->Record.GetIssues()) {
issues << issue.ShortDebugString() + ", ";
}
Cerr << "\n SAVDGB " << issues << "\n";
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleRequest "
<< "SchemaShard error when trying to execute a split request: " << issues);
Send(ParentActorId, scaleRequestResult.release());
Die(ctx);
} else {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition_scale_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace NKikimr {
namespace NPQ {

class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScaleRequest> {
using TBase = NActors::TActorBootstrapped<TPartitionScaleRequest>;

Expand Down Expand Up @@ -48,7 +48,7 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
}
std::pair<TString, TString> SplitPath(const TString& path);
void SendProposeRequest(const NActors::TActorContext &ctx);
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName);
void FillProposeRequest(const NActors::TActorContext &ctx, TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName);

private:
const TString Topic;
Expand Down
20 changes: 17 additions & 3 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,17 +541,31 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
ProcessTimestampsForNewData(prevEndOffset, ctx);
}

NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) {
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;

LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus writeSpeedUsagePercent# " << writeSpeedUsagePercent << " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
} else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus NEED_MERGE" << " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
return NKikimrPQ::EScaleStatus::NEED_MERGE;
}
return NKikimrPQ::EScaleStatus::NORMAL;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);

partitionsInfo[p.GetPartition()] = {p.GetTabletId(), {}};
if (SplitMergeEnabled(TabletConfig)) {
if (SplitMergeEnabled(TabletConfig) && p.HasKeyRange()) {
partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange());
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
a = "a";
b = {};
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
Cerr << "\n SAVDBG " << res << "\n";
UNIT_ASSERT(a < res);
UNIT_ASSERT(b != res);

Expand Down

0 comments on commit 732b4cc

Please sign in to comment.