diff --git a/ydb/core/persqueue/partition_scale_manager.cpp b/ydb/core/persqueue/partition_scale_manager.cpp index 8d03b87e1d04..b36fdb18584e 100644 --- a/ydb/core/persqueue/partition_scale_manager.cpp +++ b/ydb/core/persqueue/partition_scale_manager.cpp @@ -8,12 +8,14 @@ namespace NPQ { TPartitionScaleManager::TPartitionScaleManager( const TString& topicName, + const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config ) : TopicName(topicName) + , TopicPath(topicPath) , DatabasePath(databasePath) , BalancerConfig(pathId, version, config) { } @@ -45,6 +47,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) { << "send split request"); CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest( TopicName, + TopicPath, DatabasePath, BalancerConfig.PathId, BalancerConfig.PathVersion, diff --git a/ydb/core/persqueue/partition_scale_manager.h b/ydb/core/persqueue/partition_scale_manager.h index dc46b38f0831..39251d5b0610 100644 --- a/ydb/core/persqueue/partition_scale_manager.h +++ b/ydb/core/persqueue/partition_scale_manager.h @@ -47,7 +47,7 @@ class TPartitionScaleManager { }; public: - TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config); + TPartitionScaleManager(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config); public: void HandleScaleStatusChange(const ui32 partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx); @@ -71,6 +71,7 @@ class TPartitionScaleManager { static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000; const TString TopicName; + const TString TopicPath; TString DatabasePath = ""; TActorId CurrentScaleRequest; TDuration RequestTimeout = TDuration::MilliSeconds(0); diff --git a/ydb/core/persqueue/partition_scale_request.cpp b/ydb/core/persqueue/partition_scale_request.cpp index 28e7d8f7a595..4697a57b14fc 100644 --- a/ydb/core/persqueue/partition_scale_request.cpp +++ b/ydb/core/persqueue/partition_scale_request.cpp @@ -4,15 +4,17 @@ namespace NKikimr { namespace NPQ { TPartitionScaleRequest::TPartitionScaleRequest( - TString topicName, - TString databasePath, + const TString& topicName, + const TString& topicPath, + const TString& databasePath, ui64 pathId, ui64 pathVersion, - std::vector splits, - const std::vector merges, - NActors::TActorId parentActorId + const std::vector& splits, + const std::vector& merges, + const NActors::TActorId& parentActorId ) : Topic(topicName) + , TopicPath(topicPath) , DatabasePath(databasePath) , PathId(pathId) , PathVersion(pathVersion) @@ -30,14 +32,17 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) { void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) { auto proposal = std::make_unique(); proposal->Record.SetDatabaseName(CanonizePath(DatabasePath)); - FillProposeRequest(*proposal, DatabasePath, Topic, ctx); + FillProposeRequest(*proposal, ctx); ctx.Send(MakeTxProxyID(), proposal.release()); } -void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) { +void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx) { + auto workingDir = TopicPath.substr(0, TopicPath.size() - Topic.size()); + auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme(); modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup); modifyScheme.SetWorkingDir(workingDir); + modifyScheme.SetInternal(true); auto applyIf = modifyScheme.AddApplyIf(); applyIf->SetPathId(PathId); @@ -45,9 +50,9 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa applyIf->SetCheckEntityVersion(true); NKikimrSchemeOp::TPersQueueGroupDescription groupDescription; - groupDescription.SetName(topicName); + groupDescription.SetName(Topic); TStringBuilder logMessage; - logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: "; + logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions of '" << workingDir << "/" << Topic << "'. Spilts: "; for(const auto& split: Splits) { auto* newSplit = groupDescription.AddSplit(); logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' "; diff --git a/ydb/core/persqueue/partition_scale_request.h b/ydb/core/persqueue/partition_scale_request.h index 017825e78d82..764c8f033eb0 100644 --- a/ydb/core/persqueue/partition_scale_request.h +++ b/ydb/core/persqueue/partition_scale_request.h @@ -26,7 +26,10 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped splits, const std::vector merges, NActors::TActorId parentActorId); + TPartitionScaleRequest(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, ui64 pathVersion, + const std::vector& splits, + const std::vector& merges, + const NActors::TActorId& parentActorId); public: void Bootstrap(const NActors::TActorContext &ctx); @@ -48,10 +51,11 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped SplitPath(const TString& path); void SendProposeRequest(const NActors::TActorContext &ctx); - void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx); + void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx); private: const TString Topic; + const TString TopicPath; const TString DatabasePath; const ui64 PathId; const ui64 PathVersion; diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 547acd21d706..5fc814b94bc0 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -537,7 +537,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr if (SplitMergeEnabled(TabletConfig)) { if (!PartitionsScaleManager) { - PartitionsScaleManager = std::make_unique(Topic, DatabasePath, PathId, Version, TabletConfig); + PartitionsScaleManager = std::make_unique(Topic, Path, DatabasePath, PathId, Version, TabletConfig); } else { PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig); } @@ -1266,16 +1266,20 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr& void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx) { if (!SplitMergeEnabled(TabletConfig)) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: autopartitioning disabled."); return; } auto& record = ev->Get()->Record; auto* node = PartitionGraph.GetPartition(record.GetPartitionId()); if (!node) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: partition " << record.GetPartitionId() << " not found."); return; } if (PartitionsScaleManager) { PartitionsScaleManager->HandleScaleStatusChange(record.GetPartitionId(), record.GetScaleStatus(), ctx); + } else { + LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: scale manager isn`t initialized."); } } diff --git a/ydb/core/persqueue/read_balancer__txinit.h b/ydb/core/persqueue/read_balancer__txinit.h index cc9a26ad4678..8b2367a15489 100644 --- a/ydb/core/persqueue/read_balancer__txinit.h +++ b/ydb/core/persqueue/read_balancer__txinit.h @@ -60,7 +60,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction { Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig); if (SplitMergeEnabled(Self->TabletConfig)) { - Self->PartitionsScaleManager = std::make_unique(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig); + Self->PartitionsScaleManager = std::make_unique(Self->Topic, Self->Path, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig); } Self->UpdateConfigCounters(); } diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp index 4f01439c8601..db447a2c5558 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp @@ -96,7 +96,7 @@ TTopicSdkTestSetup CreateSetup() { NKikimrConfig::TFeatureFlags ff; ff.SetEnableTopicSplitMerge(true); ff.SetEnablePQConfigTransactionsAtSchemeShard(true); - //ff.SetEnableTopicServiceTx(true); + ff.SetEnableTopicServiceTx(true); auto settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetFeatureFlags(ff); diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index 0b955e0b4770..e8dc6c2a3de7 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -894,6 +894,60 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { } } + void ExecuteQuery(NYdb::NTable::TSession& session, const TString& query ) { + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + Y_UNIT_TEST(WithDir_PartitionSplit_AutosplitByLoad) { + TTopicSdkTestSetup setup = CreateSetup(); + auto client = setup.MakeClient(); + auto tableClient = setup.MakeTableClient(); + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + + setup.GetServer().AnnoyingClient->MkDir("/Root", "dir"); + + ExecuteQuery(session, R"( + --!syntax_v1 + CREATE TOPIC `/Root/dir/origin` + WITH ( + AUTO_PARTITIONING_STRATEGY = 'SCALE_UP', + MAX_ACTIVE_PARTITIONS = 50 + ); + )"); + + { + auto describe = client.DescribeTopic("/Root/dir/origin").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1); + } + + ui64 balancerTabletId; + { + auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/dir/origin")->Record.GetPathDescription().GetSelf(); + balancerTabletId = pathDescr.GetBalancerTabletID(); + Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush; + UNIT_ASSERT(balancerTabletId); + } + + { + const auto edge = setup.GetRuntime().AllocateEdgeActor(); + setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT)); + } + + { + size_t partitionCount = 0; + for (size_t i = 0; i < 10; ++i) { + Sleep(TDuration::Seconds(1)); + auto describe = client.DescribeTopic("/Root/dir/origin").GetValueSync(); + partitionCount = describe.GetTopicDescription().GetPartitions().size(); + if (partitionCount == 3) { + break; + } + } + UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3); + } + } + Y_UNIT_TEST(MidOfRange) { auto AsString = [](std::vector vs) { TStringBuilder a; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp index c3acc4654003..5b9ed0d61c89 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -136,3 +136,9 @@ TTopicClient TTopicSdkTestSetup::MakeClient() const { return TTopicClient(MakeDriver()); } + +NYdb::NTable::TTableClient TTopicSdkTestSetup::MakeTableClient() const +{ + return NYdb::NTable::TTableClient(MakeDriver(), NYdb::NTable::TClientSettings() + .UseQueryCache(false)); +} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h index 2bdae4489c50..2fdd0bc70327 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h @@ -33,6 +33,7 @@ class TTopicSdkTestSetup { TLog& GetLog(); TTopicClient MakeClient() const; + NYdb::NTable::TTableClient MakeTableClient() const; TDriver MakeDriver() const; TDriver MakeDriver(const TDriverConfig& config) const;