Skip to content

Commit

Permalink
Fix autopartitioning of topics with path that is not root of database (
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Oct 18, 2024
1 parent df4aa1a commit 4bbfda7
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 15 deletions.
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ namespace NPQ {

TPartitionScaleManager::TPartitionScaleManager(
const TString& topicName,
const TString& topicPath,
const TString& databasePath,
ui64 pathId,
int version,
const NKikimrPQ::TPQTabletConfig& config
)
: TopicName(topicName)
, TopicPath(topicPath)
, DatabasePath(databasePath)
, BalancerConfig(pathId, version, config) {
}
Expand Down Expand Up @@ -45,6 +47,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
<< "send split request");
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
TopicName,
TopicPath,
DatabasePath,
BalancerConfig.PathId,
BalancerConfig.PathVersion,
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TPartitionScaleManager {
};

public:
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config);
TPartitionScaleManager(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config);

public:
void HandleScaleStatusChange(const ui32 partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
Expand All @@ -71,6 +71,7 @@ class TPartitionScaleManager {
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;

const TString TopicName;
const TString TopicPath;
TString DatabasePath = "";
TActorId CurrentScaleRequest;
TDuration RequestTimeout = TDuration::MilliSeconds(0);
Expand Down
23 changes: 14 additions & 9 deletions ydb/core/persqueue/partition_scale_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ namespace NKikimr {
namespace NPQ {

TPartitionScaleRequest::TPartitionScaleRequest(
TString topicName,
TString databasePath,
const TString& topicName,
const TString& topicPath,
const TString& databasePath,
ui64 pathId,
ui64 pathVersion,
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
NActors::TActorId parentActorId
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
const NActors::TActorId& parentActorId
)
: Topic(topicName)
, TopicPath(topicPath)
, DatabasePath(databasePath)
, PathId(pathId)
, PathVersion(pathVersion)
Expand All @@ -30,24 +32,27 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) {
auto proposal = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
proposal->Record.SetDatabaseName(CanonizePath(DatabasePath));
FillProposeRequest(*proposal, DatabasePath, Topic, ctx);
FillProposeRequest(*proposal, ctx);
ctx.Send(MakeTxProxyID(), proposal.release());
}

void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) {
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx) {
auto workingDir = TopicPath.substr(0, TopicPath.size() - Topic.size());

auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup);
modifyScheme.SetWorkingDir(workingDir);
modifyScheme.SetInternal(true);

auto applyIf = modifyScheme.AddApplyIf();
applyIf->SetPathId(PathId);
applyIf->SetPathVersion(PathVersion == 0 ? 1 : PathVersion);
applyIf->SetCheckEntityVersion(true);

NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
groupDescription.SetName(topicName);
groupDescription.SetName(Topic);
TStringBuilder logMessage;
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: ";
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions of '" << workingDir << "/" << Topic << "'. Spilts: ";
for(const auto& split: Splits) {
auto* newSplit = groupDescription.AddSplit();
logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' ";
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/persqueue/partition_scale_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
};

public:
TPartitionScaleRequest(TString topicName, TString databasePath, ui64 pathId, ui64 pathVersion, std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits, const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges, NActors::TActorId parentActorId);
TPartitionScaleRequest(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, ui64 pathVersion,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
const NActors::TActorId& parentActorId);

public:
void Bootstrap(const NActors::TActorContext &ctx);
Expand All @@ -48,10 +51,11 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
}
std::pair<TString, TString> SplitPath(const TString& path);
void SendProposeRequest(const NActors::TActorContext &ctx);
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx);
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx);

private:
const TString Topic;
const TString TopicPath;
const TString DatabasePath;
const ui64 PathId;
const ui64 PathVersion;
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr

if (SplitMergeEnabled(TabletConfig)) {
if (!PartitionsScaleManager) {
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, PathId, Version, TabletConfig);
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, Path, DatabasePath, PathId, Version, TabletConfig);
} else {
PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig);
}
Expand Down Expand Up @@ -1266,16 +1266,20 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr&

void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx) {
if (!SplitMergeEnabled(TabletConfig)) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: autopartitioning disabled.");
return;
}
auto& record = ev->Get()->Record;
auto* node = PartitionGraph.GetPartition(record.GetPartitionId());
if (!node) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: partition " << record.GetPartitionId() << " not found.");
return;
}

if (PartitionsScaleManager) {
PartitionsScaleManager->HandleScaleStatusChange(record.GetPartitionId(), record.GetScaleStatus(), ctx);
} else {
LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: scale manager isn`t initialized.");
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/read_balancer__txinit.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);

if (SplitMergeEnabled(Self->TabletConfig)) {
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig);
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->Path, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig);
}
Self->UpdateConfigCounters();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ TTopicSdkTestSetup CreateSetup() {
NKikimrConfig::TFeatureFlags ff;
ff.SetEnableTopicSplitMerge(true);
ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
//ff.SetEnableTopicServiceTx(true);
ff.SetEnableTopicServiceTx(true);

auto settings = TTopicSdkTestSetup::MakeServerSettings();
settings.SetFeatureFlags(ff);
Expand Down
54 changes: 54 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,60 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}
}

void ExecuteQuery(NYdb::NTable::TSession& session, const TString& query ) {
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

Y_UNIT_TEST(WithDir_PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
auto client = setup.MakeClient();
auto tableClient = setup.MakeTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();

setup.GetServer().AnnoyingClient->MkDir("/Root", "dir");

ExecuteQuery(session, R"(
--!syntax_v1
CREATE TOPIC `/Root/dir/origin`
WITH (
AUTO_PARTITIONING_STRATEGY = 'SCALE_UP',
MAX_ACTIVE_PARTITIONS = 50
);
)");

{
auto describe = client.DescribeTopic("/Root/dir/origin").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
}

ui64 balancerTabletId;
{
auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/dir/origin")->Record.GetPathDescription().GetSelf();
balancerTabletId = pathDescr.GetBalancerTabletID();
Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
UNIT_ASSERT(balancerTabletId);
}

{
const auto edge = setup.GetRuntime().AllocateEdgeActor();
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT));
}

{
size_t partitionCount = 0;
for (size_t i = 0; i < 10; ++i) {
Sleep(TDuration::Seconds(1));
auto describe = client.DescribeTopic("/Root/dir/origin").GetValueSync();
partitionCount = describe.GetTopicDescription().GetPartitions().size();
if (partitionCount == 3) {
break;
}
}
UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3);
}
}

Y_UNIT_TEST(MidOfRange) {
auto AsString = [](std::vector<ui16> vs) {
TStringBuilder a;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,9 @@ TTopicClient TTopicSdkTestSetup::MakeClient() const
{
return TTopicClient(MakeDriver());
}

NYdb::NTable::TTableClient TTopicSdkTestSetup::MakeTableClient() const
{
return NYdb::NTable::TTableClient(MakeDriver(), NYdb::NTable::TClientSettings()
.UseQueryCache(false));
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TTopicSdkTestSetup {
TLog& GetLog();

TTopicClient MakeClient() const;
NYdb::NTable::TTableClient MakeTableClient() const;

TDriver MakeDriver() const;
TDriver MakeDriver(const TDriverConfig& config) const;
Expand Down

0 comments on commit 4bbfda7

Please sign in to comment.