Skip to content

Commit

Permalink
Split the partition if there is only more than 1 producer (#7379)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Aug 2, 2024
1 parent 0442860 commit 53e1321
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 62 deletions.
2 changes: 2 additions & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
TDeque<std::unique_ptr<IEventBase>> PendingEvents;
TRowVersion LastEmittedHeartbeat;

TLastCounter SourceIdCounter;

const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config);

bool ClosedInternalPartition = false;
Expand Down
41 changes: 27 additions & 14 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,14 @@ void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ct

void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
PQ_LOG_T("TPartition::AnswerCurrentWrites. Responses.size()=" << Responses.size());
const auto now = ctx.Now();

ui64 offset = EndOffset;
while (!Responses.empty()) {
const auto& response = Responses.front();

const TDuration queueTime = response.QueueTime;
const TDuration writeTime = ctx.Now() - response.WriteTimeBaseline;
const TDuration writeTime = now - response.WriteTimeBaseline;

if (response.IsWrite()) {
const auto& writeResponse = response.GetWrite();
Expand All @@ -257,6 +258,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {

bool already = false;

SourceIdCounter.Use(s, now);
auto it = SourceIdStorage.GetInMemorySourceIds().find(s);

ui64 maxSeqNo = 0;
Expand Down Expand Up @@ -485,13 +487,15 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
}
HaveWriteMsg = false;

const auto now = ctx.Now();

for (auto& [sourceId, info] : TxSourceIdForPostPersist) {
auto it = SourceIdStorage.GetInMemorySourceIds().find(sourceId);
if (it.IsEnd()) {
SourceIdStorage.RegisterSourceId(sourceId, info.SeqNo, info.Offset, ctx.Now());
SourceIdStorage.RegisterSourceId(sourceId, info.SeqNo, info.Offset, now);
} else {
ui64 seqNo = std::max(info.SeqNo, it->second.SeqNo);
SourceIdStorage.RegisterSourceId(sourceId, it->second.Updated(seqNo, info.Offset, ctx.Now()));
SourceIdStorage.RegisterSourceId(sourceId, it->second.Updated(seqNo, info.Offset, now));
}
}
TxSourceIdForPostPersist.clear();
Expand All @@ -505,8 +509,8 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
}
ui64 prevEndOffset = EndOffset;

ui32 totalLatencyMs = (ctx.Now() - WriteCycleStartTime).MilliSeconds();
ui32 writeLatencyMs = (ctx.Now() - WriteStartTime).MilliSeconds();
ui32 totalLatencyMs = (now - WriteCycleStartTime).MilliSeconds();
ui32 writeLatencyMs = (now - WriteStartTime).MilliSeconds();

WriteLatency.IncFor(writeLatencyMs, 1);
if (writeLatencyMs >= AppData(ctx)->PQConfig.GetWriteLatencyBigMs()) {
Expand All @@ -522,7 +526,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
UpdateAfterWriteCounters(true);

//All ok
auto now = ctx.Now();
for (auto& avg : AvgWriteBytes) {
avg.Update(WriteNewSize, now);
}
Expand All @@ -538,11 +541,9 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
if (SupportivePartitionTimeLag) {
SupportivePartitionTimeLag->UpdateTimestamp(now.MilliSeconds());
}
if (SplitMergeEnabled(Config)) {
SplitMergeAvgWriteBytes->Update(WriteNewSizeFull, now);
auto needScaling = CheckScaleStatus(ctx);
ChangeScaleStatusIfNeeded(needScaling);
}

auto writeNewSizeFull = WriteNewSizeFull;

WriteCycleSize = 0;
WriteNewSize = 0;
WriteNewSizeFull = 0;
Expand All @@ -556,6 +557,12 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
AnswerCurrentWrites(ctx);
SyncMemoryStateWithKVState(ctx);

if (SplitMergeEnabled(Config)) {
SplitMergeAvgWriteBytes->Update(writeNewSizeFull, now);
auto needScaling = CheckScaleStatus(ctx);
ChangeScaleStatusIfNeeded(needScaling);
}

//if EndOffset changed there could be subscriptions witch could be completed
TVector<std::pair<TReadInfo, ui64>> reads = Subscriber.GetReads(EndOffset);
for (auto& read : reads) {
Expand All @@ -569,23 +576,28 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
}

NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
const auto writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
const auto sourceIdWindow = TDuration::Seconds(std::min<ui32>(5, Config.GetPartitionStrategy().GetScaleThresholdSeconds()));
const auto sourceIdCount = SourceIdCounter.Count(ctx.Now() - sourceIdWindow);

LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus"
<< " splitMergeAvgWriteBytes# " << SplitMergeAvgWriteBytes->GetValue()
<< " writeSpeedUsagePercent# " << writeSpeedUsagePercent
<< " scaleThresholdSeconds# " << Config.GetPartitionStrategy().GetScaleThresholdSeconds()
<< " totalPartitionWriteSpeed# " << TotalPartitionWriteSpeed
<< " sourceIdCount=" << sourceIdCount
<< " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);

auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent() && sourceIdCount > 1) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"." <<
Expand All @@ -596,7 +608,8 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus NEED_MERGE" << " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
" Partition: " << Partition << " writeSpeedUsagePercent: " << writeSpeedUsagePercent <<
" Threshold: " << Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()
);
return NKikimrPQ::EScaleStatus::NEED_MERGE;
}
Expand Down
80 changes: 32 additions & 48 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}


writeSession1->Close(TDuration::Seconds(1));
writeSession2->Close(TDuration::Seconds(1));
writeSession3->Close(TDuration::Seconds(1));
writeSession1->Close(TDuration::Seconds(2));
writeSession2->Close(TDuration::Seconds(2));
writeSession3->Close(TDuration::Seconds(2));
readSession.Close();
}

Expand Down Expand Up @@ -836,63 +836,47 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
.BeginConfigureAutoPartitioningSettings()
.UpUtilizationPercent(2)
.DownUtilizationPercent(1)
.StabilizationWindow(TDuration::Seconds(1))
.StabilizationWindow(TDuration::Seconds(2))
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(TEST_TOPIC, createSettings).Wait();

auto msg = TString(1_MB, 'a');

auto writeSession = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false);
UNIT_ASSERT(writeSession->Write(Msg(msg, 1)));
UNIT_ASSERT(writeSession->Write(Msg(msg, 2)));
Sleep(TDuration::Seconds(5));
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);

bool firstPartitionFound = false;
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
if (partition.GetPartitionId() == 0) {
firstPartitionFound = true;
UNIT_ASSERT(!partition.GetActive());
UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2);
auto childIds = partition.GetChildPartitionIds();
std::sort(childIds.begin(), childIds.end());
UNIT_ASSERT_EQUAL(childIds[0], 1);
UNIT_ASSERT_EQUAL(childIds[1], 2);
}
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false);
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, TEST_TOPIC, false);

{
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 1)));
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 2)));
Sleep(TDuration::Seconds(5));
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
}

UNIT_ASSERT(firstPartitionFound);

TString secondPartitionTo = "";
TString thirdPartitionFrom = "";
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) {
UNIT_ASSERT(partition.GetActive());
if (partition.GetPartitionId() == 1) {
UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty());
secondPartitionTo = *partition.GetToBound();
}
if (partition.GetPartitionId() == 2) {
UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty());
thirdPartitionFrom = *partition.GetFromBound();
}
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1);
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0);
}
{
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 3)));
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 4)));
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 5)));
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 6)));
Sleep(TDuration::Seconds(5));
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
}

UNIT_ASSERT(!secondPartitionTo.Empty());
UNIT_ASSERT(!thirdPartitionFrom.Empty());
auto writeSession2_1 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
auto writeSession2_2 = CreateWriteSession(client, "producer-2", 1, TEST_TOPIC, false);

auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
UNIT_ASSERT(writeSession2->Write(Msg(msg, 3)));
UNIT_ASSERT(writeSession2->Write(Msg(msg, 4)));
Sleep(TDuration::Seconds(5));
auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
{
UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 7)));
UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 8)));
UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 9)));
UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 10)));
Sleep(TDuration::Seconds(5));
auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
}
}

Y_UNIT_TEST(MidOfRange) {
Expand Down
81 changes: 81 additions & 0 deletions ydb/core/persqueue/ut/utils_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include <ydb/core/persqueue/utils.h>

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NPQ {

Y_UNIT_TEST_SUITE(TPQUtilsTest) {
Y_UNIT_TEST(TLastCounter) {
TLastCounter counter;

TInstant now = TInstant::Now();

{
auto r = counter.Count(now);
UNIT_ASSERT_VALUES_EQUAL(r, 0);
}

{
counter.Use("v-1", now);
auto r = counter.Count(now);
UNIT_ASSERT_VALUES_EQUAL(r, 1);
}

{
counter.Use("v-1", now);
auto r = counter.Count(now);
UNIT_ASSERT_VALUES_EQUAL(r, 1);
}

now += TDuration::Seconds(1);

{
counter.Use("v-1", now);
auto r = counter.Count(now - TDuration::Seconds(10));
UNIT_ASSERT_VALUES_EQUAL(r, 1);
}

{
auto r = counter.Count(now);
UNIT_ASSERT_VALUES_EQUAL(r, 1);
}

{
counter.Use("v-2", now);
auto r = counter.Count(now - TDuration::Seconds(10));
UNIT_ASSERT_VALUES_EQUAL(r, 2);
}

{
counter.Use("v-1", now);
auto r = counter.Count(now - TDuration::Seconds(10));
UNIT_ASSERT_VALUES_EQUAL(r, 2);
}

now += TDuration::Seconds(1);

{
counter.Use("v-3", now);
auto r = counter.Count(now - TDuration::Seconds(10));
UNIT_ASSERT_VALUES_EQUAL(r, 2);
}

now += TDuration::Seconds(1);

{
counter.Use("v-3", now);
auto r = counter.Count(now - TDuration::Seconds(10));
UNIT_ASSERT_VALUES_EQUAL(r, 2);
}

now += TDuration::Seconds(1);

{
counter.Use("v-2", now);
auto r = counter.Count(now - TDuration::Seconds(10));
UNIT_ASSERT_VALUES_EQUAL(r, 2);
}
}
}

}
1 change: 1 addition & 0 deletions ydb/core/persqueue/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ SRCS(
pqrb_describes_ut.cpp
microseconds_sliding_window_ut.cpp
fetch_request_ut.cpp
utils_ut.cpp
)

RESOURCE(
Expand Down
27 changes: 27 additions & 0 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,31 @@ TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescrip
return TPartitionGraph(BuildGraph<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>(config.GetPartitions()));
}

void TLastCounter::Use(const TString& value, const TInstant& now) {
const auto full = MaxValueCount == Values.size();
if (!Values.empty() && Values[0].Value == value) {
auto& v0 = Values[0];
if (v0.LastUseTime < now) {
v0.LastUseTime = now;
if (full && Values[1].LastUseTime != now) {
Values.push_back(std::move(v0));
Values.pop_front();
}
}
} else if (full && Values[1].Value == value) {
Values[1].LastUseTime = now;
} else if (!full || Values[0].LastUseTime < now) {
if (full) {
Values.pop_front();
}
Values.push_back(Data{now, value});
}
}

size_t TLastCounter::Count(const TInstant& expirationTime) {
return std::count_if(Values.begin(), Values.end(), [&](const auto& i) {
return i.LastUseTime >= expirationTime;
});
}

} // NKikimr::NPQ
18 changes: 18 additions & 0 deletions ydb/core/persqueue/utils.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <deque>
#include <util/datetime/base.h>
#include <util/string/builder.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/pqconfig.pb.h>
Expand Down Expand Up @@ -69,4 +71,20 @@ TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config);
TPartitionGraph MakePartitionGraph(const NKikimrPQ::TUpdateBalancerConfig& config);
TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config);

class TLastCounter {
static constexpr size_t MaxValueCount = 2;

public:
void Use(const TString& value, const TInstant& now);
size_t Count(const TInstant& expirationTime);

private:
struct Data {
TInstant LastUseTime;
TString Value;
};
std::deque<Data> Values;
};


} // NKikimr::NPQ

0 comments on commit 53e1321

Please sign in to comment.