Skip to content

Commit

Permalink
Merge 13276ea into 1ef5236
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored May 30, 2024
2 parents 1ef5236 + 13276ea commit 72ab491
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 41 deletions.
30 changes: 30 additions & 0 deletions ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,36 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
}

Y_UNIT_TEST(ControlPlane_AutoscalingWithStorageSizeRetention) {
auto autoscalingTestTopic = "autoscalit-topic";
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();

TCreateTopicSettings createSettings;
createSettings
.RetentionStorageMb(1024)
.BeginConfigurePartitioningSettings()
.BeginConfigureAutoscalingSettings()
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.EndConfigurePartitioningSettings();
auto result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);

createSettings.RetentionStorageMb(0);
result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);

TAlterTopicSettings alterSettings;
alterSettings
.SetRetentionStorageMb(1024);

result = client.AlterTopic(autoscalingTestTopic, alterSettings).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
}

Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();
Expand Down
86 changes: 45 additions & 41 deletions ydb/services/lib/actors/pq_schema_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ namespace NKikimr::NGRpcProxy::V1 {
error = TStringBuilder() << "Partition scale threshold time must be greater then 1 second, provided " << strategy.GetScaleThresholdSeconds() << " seconds";
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}
if (strategy.GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED && config.GetPartitionConfig().HasStorageLimitBytes()) {
error = TStringBuilder() << "Partitions autoscaling is incompatible with retention storage bytes option";
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
}

return std::nullopt;
}
Expand All @@ -736,6 +740,29 @@ namespace NKikimr::NGRpcProxy::V1 {
auto minParts = 1;
auto* pqTabletConfig = pqDescr->MutablePQTabletConfig();
auto partConfig = pqTabletConfig->MutablePartitionConfig();

switch (settings.retention_case()) {
case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: {
partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll));
}
break;

case Ydb::PersQueue::V1::TopicSettings::kRetentionStorageBytes: {
if (settings.retention_storage_bytes() <= 0) {
error = TStringBuilder() << "retention_storage_bytes must be positive, provided " <<
settings.retention_storage_bytes();
return Ydb::StatusIds::BAD_REQUEST;
}
partConfig->SetStorageLimitBytes(settings.retention_storage_bytes());
}
break;

default: {
error = TStringBuilder() << "retention_storage_bytes or retention_period_ms should be set";
return Ydb::StatusIds::BAD_REQUEST;
}
}

if (settings.has_partitions_count()) {
if (settings.partitions_count() > 0) {
minParts = settings.partitions_count();
Expand Down Expand Up @@ -816,28 +843,6 @@ namespace NKikimr::NGRpcProxy::V1 {
partConfig->SetMaxSizeInPartition(settings.max_partition_storage_size() ? settings.max_partition_storage_size() : Max<i64>());
partConfig->SetMaxCountInPartition(Max<i32>());

switch (settings.retention_case()) {
case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: {
partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll));
}
break;

case Ydb::PersQueue::V1::TopicSettings::kRetentionStorageBytes: {
if (settings.retention_storage_bytes() <= 0) {
error = TStringBuilder() << "retention_storage_bytes must be positive, provided " <<
settings.retention_storage_bytes();
return Ydb::StatusIds::BAD_REQUEST;
}
partConfig->SetStorageLimitBytes(settings.retention_storage_bytes());
}
break;

default: {
error = TStringBuilder() << "retention_storage_bytes or retention_period_ms should be set";
return Ydb::StatusIds::BAD_REQUEST;
}
}

if (settings.message_group_seqno_retention_period_ms() > 0 && settings.message_group_seqno_retention_period_ms() < settings.retention_period_ms()) {
error = TStringBuilder() << "message_group_seqno_retention_period_ms (provided " << settings.message_group_seqno_retention_period_ms() << ") must be more then retention_period_ms (provided " << settings.retention_period_ms() << ")";
return Ydb::StatusIds::BAD_REQUEST;
Expand Down Expand Up @@ -1077,6 +1082,10 @@ namespace NKikimr::NGRpcProxy::V1 {

auto pqTabletConfig = pqDescr->MutablePQTabletConfig();
auto partConfig = pqTabletConfig->MutablePartitionConfig();

if (request.retention_storage_mb())
partConfig->SetStorageLimitBytes(request.retention_storage_mb() * 1024 * 1024);

if (request.has_partitioning_settings()) {
const auto& settings = request.partitioning_settings();
if (settings.min_active_partitions() > 0) {
Expand Down Expand Up @@ -1162,9 +1171,6 @@ namespace NKikimr::NGRpcProxy::V1 {
partConfig->SetLifetimeSeconds(TDuration::Days(1).Seconds());
}

if (request.retention_storage_mb())
partConfig->SetStorageLimitBytes(request.retention_storage_mb() * 1024 * 1024);

if (local) {
auto partSpeed = request.partition_write_speed_bytes_per_second();
if (partSpeed == 0) {
Expand Down Expand Up @@ -1237,9 +1243,16 @@ namespace NKikimr::NGRpcProxy::V1 {
auto pqTabletConfig = pqDescr.MutablePQTabletConfig();
NPQ::Migrate(*pqTabletConfig);
auto partConfig = pqTabletConfig->MutablePartitionConfig();
if (request.has_alter_partitioning_settings()) {
auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge();
auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge();

if (request.has_set_retention_storage_mb()) {
CHECK_CDC;
partConfig->ClearStorageLimitBytes();
if (request.set_retention_storage_mb())
partConfig->SetStorageLimitBytes(request.set_retention_storage_mb() * 1024 * 1024);
}

if (request.has_alter_partitioning_settings()) {
const auto& settings = request.alter_partitioning_settings();
if (settings.has_set_min_active_partitions()) {
auto minParts = IfEqualThenDefault(settings.set_min_active_partitions(), 0L, 1L);
Expand All @@ -1248,6 +1261,7 @@ namespace NKikimr::NGRpcProxy::V1 {
pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts);
}
}

if (splitMergeFeatureEnabled) {
if (settings.has_set_max_active_partitions()) {
pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions());
Expand Down Expand Up @@ -1278,11 +1292,12 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
}
if (auto code = ValidatePartitionStrategy(*pqTabletConfig, error); code) {
return code->YdbCode;
}
}
}

if (splitMergeFeatureEnabled) {
auto code = ValidatePartitionStrategy(*pqTabletConfig, error);
if (code) return code->YdbCode;
}

if (request.alter_attributes().size()) {
Expand All @@ -1299,14 +1314,6 @@ namespace NKikimr::NGRpcProxy::V1 {
partConfig->SetLifetimeSeconds(request.set_retention_period().seconds());
}


if (request.has_set_retention_storage_mb()) {
CHECK_CDC;
partConfig->ClearStorageLimitBytes();
if (request.set_retention_storage_mb())
partConfig->SetStorageLimitBytes(request.set_retention_storage_mb() * 1024 * 1024);
}

bool local = true; //todo: check locality
if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
if (request.has_set_partition_write_speed_bytes_per_second()) {
Expand Down Expand Up @@ -1434,7 +1441,4 @@ namespace NKikimr::NGRpcProxy::V1 {

return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
}



}

0 comments on commit 72ab491

Please sign in to comment.