Skip to content

Commit

Permalink
Disallow disabling of topic autopartitioning (#8871) (#8949)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Sep 10, 2024
1 parent f3a1a62 commit 17a9a61
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 70 deletions.
24 changes: 1 addition & 23 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,29 +753,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
f.Wait();

auto v = f.GetValueSync();
UNIT_ASSERT_C(!v.IsSuccess(), "Must receve error becuse max-partition is not 0");
}

{
TAlterTopicSettings alterSettings;
alterSettings
.BeginAlterPartitioningSettings()
.MaxActivePartitions(0)
.BeginAlterAutoPartitioningSettings()
.Strategy(EAutoPartitioningStrategy::Disabled)
.EndAlterAutoPartitioningSettings()
.EndAlterTopicPartitioningSettings();
auto f = client.AlterTopic(topicName, alterSettings);
f.Wait();

auto v = f.GetValueSync();
UNIT_ASSERT_C(v.IsSuccess(), "Error: " << v);
}

{
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), EAutoPartitioningStrategy::Disabled);
UNIT_ASSERT_C(!v.IsSuccess(), "Must receve error becuse disabling is not supported");
}
}

Expand Down
8 changes: 2 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,8 @@ class TAlterPQ: public TSubOperation {

if (alterConfig.HasPartitionStrategy() && !NPQ::SplitMergeEnabled(alterConfig)
&& tabletConfig->HasPartitionStrategy() && NPQ::SplitMergeEnabled(*tabletConfig)) {
if (!alterConfig.GetPartitionStrategy().HasMaxPartitionCount() || 0 != alterConfig.GetPartitionStrategy().GetMaxPartitionCount()) {
errStr = TStringBuilder() << "Can`t disable auto partitioning. Disabling auto partitioning is a destructive operation, "
<< "after which all partitions will become active and the message order guarantee will be violated. "
<< "If you are sure of this, then set max_active_partitions to 0.";
return nullptr;
}
errStr = TStringBuilder() << "Can`t disable auto partitioning.";
return nullptr;
}

if (!alterConfig.HasPartitionStrategy() && tabletConfig->HasPartitionStrategy()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,29 +667,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardTopicSplitMergeTest) {
partitionStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED);
}
}, {{TEvSchemeShard::EStatus::StatusInvalidParameter}});

ModifyTopic(runtime, env, txId, [&](auto& scheme) {
{
auto* partitionStrategy = scheme.MutablePQTabletConfig()->MutablePartitionStrategy();
partitionStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED);
partitionStrategy->SetMaxPartitionCount(0);
}
});

topic = DescribeTopic(runtime);

UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED),
static_cast<int>(topic.GetPQTabletConfig().GetPartitionStrategy().GetPartitionStrategyType()));

UNIT_ASSERT_VALUES_EQUAL(3, topic.GetPartitions().size());
for (const auto& p : topic.GetPartitions()) {
Cerr << ">>>>> Verify partition " << p.GetPartitionId() << Endl << Flush;
UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(::NKikimrPQ::ETopicPartitionStatus::Active), static_cast<int>(p.GetStatus()));
UNIT_ASSERT(p.GetChildPartitionIds().empty());
UNIT_ASSERT(p.GetParentPartitionIds().empty());
UNIT_ASSERT(!p.HasKeyRange());
}

} // Y_UNIT_TEST(DisableSplitMerge)

Y_UNIT_TEST(EnableSplitMerge) {
Expand Down
2 changes: 0 additions & 2 deletions ydb/services/datastreams/datastreams_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,6 @@ namespace NKikimr::NDataStreams::V1 {
t->SetScaleThresholdSeconds(ws.stabilization_window().seconds() ? ws.stabilization_window().seconds() : 300);
t->SetScaleUpPartitionWriteSpeedThresholdPercent(ws.up_utilization_percent() ? ws.up_utilization_percent() : 90);
t->SetScaleDownPartitionWriteSpeedThresholdPercent(ws.down_utilization_percent() ? ws.down_utilization_percent() : 30);
} else if (0 == s.max_active_partitions()) {
t->SetMaxPartitionCount(0);
}
}

Expand Down
17 changes: 1 addition & 16 deletions ydb/services/datastreams/datastreams_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2909,23 +2909,8 @@ Y_UNIT_TEST_SUITE(DataStreams) {
).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
if (result.GetStatus() != EStatus::SUCCESS) {
result.GetIssues().PrintTo(Cerr);
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = testServer.DataStreamsClient->DescribeStream(streamName2).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
Cerr << result.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto& d = result.GetResult().stream_description();
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
}

}

Y_UNIT_TEST(Test_Crreate_AutoPartitioning_Disabled) {
Expand Down

0 comments on commit 17a9a61

Please sign in to comment.