diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp index 2ddf2bb96e44..ee9ece59a8e6 100644 --- a/ydb/core/persqueue/ut/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp @@ -571,6 +571,36 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold); } + Y_UNIT_TEST(ControlPlane_AutoscalingWithStorageSizeRetention) { + auto autoscalingTestTopic = "autoscalit-topic"; + TTopicSdkTestSetup setup = CreateSetup(); + TTopicClient client = setup.MakeClient(); + + TCreateTopicSettings createSettings; + createSettings + .RetentionStorageMb(1024) + .BeginConfigurePartitioningSettings() + .BeginConfigureAutoscalingSettings() + .Strategy(EAutoscalingStrategy::ScaleUp) + .EndConfigureAutoscalingSettings() + .EndConfigurePartitioningSettings(); + auto result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST); + + createSettings.RetentionStorageMb(0); + result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS); + + TAlterTopicSettings alterSettings; + alterSettings + .SetRetentionStorageMb(1024); + + result = client.AlterTopic(autoscalingTestTopic, alterSettings).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST); + } + Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) { TTopicSdkTestSetup setup = CreateSetup(); TTopicClient client = setup.MakeClient(); diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 464e8f4a0a43..4ef57cf5cea8 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -717,6 +717,10 @@ namespace NKikimr::NGRpcProxy::V1 { error = TStringBuilder() << "Partition scale threshold time must be greater then 1 second, provided " << strategy.GetScaleThresholdSeconds() << " seconds"; return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } + if (strategy.GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED && config.GetPartitionConfig().HasStorageLimitBytes()) { + error = TStringBuilder() << "Partitions autoscaling is incompatible with retention storage bytes option"; + return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); + } return std::nullopt; } @@ -736,6 +740,29 @@ namespace NKikimr::NGRpcProxy::V1 { auto minParts = 1; auto* pqTabletConfig = pqDescr->MutablePQTabletConfig(); auto partConfig = pqTabletConfig->MutablePartitionConfig(); + + switch (settings.retention_case()) { + case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: { + partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll)); + } + break; + + case Ydb::PersQueue::V1::TopicSettings::kRetentionStorageBytes: { + if (settings.retention_storage_bytes() <= 0) { + error = TStringBuilder() << "retention_storage_bytes must be positive, provided " << + settings.retention_storage_bytes(); + return Ydb::StatusIds::BAD_REQUEST; + } + partConfig->SetStorageLimitBytes(settings.retention_storage_bytes()); + } + break; + + default: { + error = TStringBuilder() << "retention_storage_bytes or retention_period_ms should be set"; + return Ydb::StatusIds::BAD_REQUEST; + } + } + if (settings.has_partitions_count()) { if (settings.partitions_count() > 0) { minParts = settings.partitions_count(); @@ -816,28 +843,6 @@ namespace NKikimr::NGRpcProxy::V1 { partConfig->SetMaxSizeInPartition(settings.max_partition_storage_size() ? settings.max_partition_storage_size() : Max()); partConfig->SetMaxCountInPartition(Max()); - switch (settings.retention_case()) { - case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: { - partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll)); - } - break; - - case Ydb::PersQueue::V1::TopicSettings::kRetentionStorageBytes: { - if (settings.retention_storage_bytes() <= 0) { - error = TStringBuilder() << "retention_storage_bytes must be positive, provided " << - settings.retention_storage_bytes(); - return Ydb::StatusIds::BAD_REQUEST; - } - partConfig->SetStorageLimitBytes(settings.retention_storage_bytes()); - } - break; - - default: { - error = TStringBuilder() << "retention_storage_bytes or retention_period_ms should be set"; - return Ydb::StatusIds::BAD_REQUEST; - } - } - if (settings.message_group_seqno_retention_period_ms() > 0 && settings.message_group_seqno_retention_period_ms() < settings.retention_period_ms()) { error = TStringBuilder() << "message_group_seqno_retention_period_ms (provided " << settings.message_group_seqno_retention_period_ms() << ") must be more then retention_period_ms (provided " << settings.retention_period_ms() << ")"; return Ydb::StatusIds::BAD_REQUEST; @@ -1077,6 +1082,10 @@ namespace NKikimr::NGRpcProxy::V1 { auto pqTabletConfig = pqDescr->MutablePQTabletConfig(); auto partConfig = pqTabletConfig->MutablePartitionConfig(); + + if (request.retention_storage_mb()) + partConfig->SetStorageLimitBytes(request.retention_storage_mb() * 1024 * 1024); + if (request.has_partitioning_settings()) { const auto& settings = request.partitioning_settings(); if (settings.min_active_partitions() > 0) { @@ -1162,9 +1171,6 @@ namespace NKikimr::NGRpcProxy::V1 { partConfig->SetLifetimeSeconds(TDuration::Days(1).Seconds()); } - if (request.retention_storage_mb()) - partConfig->SetStorageLimitBytes(request.retention_storage_mb() * 1024 * 1024); - if (local) { auto partSpeed = request.partition_write_speed_bytes_per_second(); if (partSpeed == 0) { @@ -1237,9 +1243,16 @@ namespace NKikimr::NGRpcProxy::V1 { auto pqTabletConfig = pqDescr.MutablePQTabletConfig(); NPQ::Migrate(*pqTabletConfig); auto partConfig = pqTabletConfig->MutablePartitionConfig(); - if (request.has_alter_partitioning_settings()) { - auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge(); + auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge(); + if (request.has_set_retention_storage_mb()) { + CHECK_CDC; + partConfig->ClearStorageLimitBytes(); + if (request.set_retention_storage_mb()) + partConfig->SetStorageLimitBytes(request.set_retention_storage_mb() * 1024 * 1024); + } + + if (request.has_alter_partitioning_settings()) { const auto& settings = request.alter_partitioning_settings(); if (settings.has_set_min_active_partitions()) { auto minParts = IfEqualThenDefault(settings.set_min_active_partitions(), 0L, 1L); @@ -1248,6 +1261,7 @@ namespace NKikimr::NGRpcProxy::V1 { pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts); } } + if (splitMergeFeatureEnabled) { if (settings.has_set_max_active_partitions()) { pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions()); @@ -1278,11 +1292,12 @@ namespace NKikimr::NGRpcProxy::V1 { } } } - if (auto code = ValidatePartitionStrategy(*pqTabletConfig, error); code) { - return code->YdbCode; - } } + } + if (splitMergeFeatureEnabled) { + auto code = ValidatePartitionStrategy(*pqTabletConfig, error); + if (code) return code->YdbCode; } if (request.alter_attributes().size()) { @@ -1299,14 +1314,6 @@ namespace NKikimr::NGRpcProxy::V1 { partConfig->SetLifetimeSeconds(request.set_retention_period().seconds()); } - - if (request.has_set_retention_storage_mb()) { - CHECK_CDC; - partConfig->ClearStorageLimitBytes(); - if (request.set_retention_storage_mb()) - partConfig->SetStorageLimitBytes(request.set_retention_storage_mb() * 1024 * 1024); - } - bool local = true; //todo: check locality if (local || pqConfig.GetTopicsAreFirstClassCitizen()) { if (request.has_set_partition_write_speed_bytes_per_second()) { @@ -1434,7 +1441,4 @@ namespace NKikimr::NGRpcProxy::V1 { return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS); } - - - }