From b5badc1c92480ad3ee0d22a024c1d43865804c85 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Feb 2024 15:08:03 +0000 Subject: [PATCH] replace ImportantClientIds --- ydb/core/persqueue/partition.cpp | 16 ++++++++------ ydb/core/persqueue/partition_read.cpp | 15 ++++++++----- ydb/core/persqueue/utils.cpp | 21 ++++++++++--------- ydb/core/persqueue/utils.h | 2 -- ydb/core/protos/pqconfig.proto | 3 ++- ydb/services/lib/actors/pq_schema_actor.cpp | 20 ++++++++++++------ .../persqueue_v1/actors/schema_actors.cpp | 4 ++-- 7 files changed, 49 insertions(+), 32 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index fb1f30ded888..9d19fe57681c 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -279,11 +279,13 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) { } ui64 TPartition::ImportantClientsMinOffset() const { - const auto& partConfig = Config.GetPartitionConfig(); - ui64 minOffset = EndOffset; - for (const auto& importantClientId : partConfig.GetImportantClientId()) { - const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId); + for (const auto& consumer : Config.GetConsumers()) { + if (!consumer.GetImportant()) { + continue; + } + + const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName()); ui64 curOffset = StartOffset; if (userInfo && userInfo->Offset >= 0) //-1 means no offset curOffset = userInfo->Offset; @@ -1773,8 +1775,10 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co } TSet important; - for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) { - important.insert(importantUser); + for (const auto& consumer : config.GetConsumers()) { + if (consumer.GetImportant()) { + important.insert(consumer.GetName()); + } } for (auto& consumer : config.GetConsumers()) { diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 188586c095d8..f5070d2b23d9 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -180,9 +180,14 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { TSet important; - for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) { - important.insert(importantUser); - TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser); + for (const auto& consumer : Config.GetConsumers()) { + if (!consumer.GetImportant()) { + continue; + } + + important.insert(consumer.GetName()); + + TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName()); if (userInfo && !userInfo->Important && userInfo->LabeledCounters) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup())); userInfo->SetImportant(true); @@ -190,12 +195,12 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { } if (!userInfo) { userInfo = &UsersInfoStorage->Create( - ctx, importantUser, 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {} + ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {} ); } if (userInfo->Offset < (i64)StartOffset) userInfo->Offset = StartOffset; - ReadTimestampForOffset(importantUser, *userInfo, ctx); + ReadTimestampForOffset(consumer.GetName(), *userInfo, ctx); } for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) { diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index a7506ffec781..08e8f1fcb961 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -45,6 +45,16 @@ ui64 PutUnitsSize(const ui64 size) { return putUnitsCount; } +bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { + for (const auto& i : config.GetPartitionConfig().GetImportantClientId()) { + if (consumerName == i) { + return true; + } + } + + return false; +} + void Migrate(NKikimrPQ::TPQTabletConfig& config) { // if ReadRules isn`t empty than it is old configuration format // when modify new format (add or alter a consumer) readRules is cleared @@ -75,6 +85,7 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) { if (i < config.ReadRuleGenerationsSize()) { consumer->SetGeneration(config.GetReadRuleGenerations(i)); } + consumer->SetImportant(IsImportantClient(config, consumer.GetName())); } } } @@ -93,16 +104,6 @@ size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config) { return config.ConsumersSize(); } -bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { - for (const auto& i : config.GetPartitionConfig().GetImportantClientId()) { - if (consumerName == i) { - return true; - } - } - - return false; -} - const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId) { for(const auto& p : config.GetPartitions()) { if (partitionId == p.GetPartitionId()) { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index b05658d0c605..1952f5912b6d 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -22,8 +22,6 @@ constexpr bool ReadRuleCompatible() { return true; } bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config); -bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); - const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId); // The graph of split-merge operations. diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index d846313fb4b8..4471ebd1907c 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -235,7 +235,7 @@ message TPartitionConfig { optional uint64 StorageLimitBytes = 16; // List of ClientIds, for which we don't delete data until they are read by these clients - repeated string ImportantClientId = 4; //can be empty + repeated string ImportantClientId = 4; //can be empty . Deprecated. Use Consumer.Important optional uint32 LowWatermark = 5 [default = 6291456]; //6Mb, compact blobs if they at least this big. optional uint32 SourceIdLifetimeSeconds = 6 [ default = 1382400]; //16 days optional uint32 SourceIdMaxCounts = 31 [default = 6000000]; // Maximum number of stored sourceId records in partition @@ -332,6 +332,7 @@ message TPQTabletConfig { optional EConsumerScalingSupport ScalingSupport = 6; optional uint64 Version = 7 [default = 0]; optional uint64 Generation = 8 [default = 0]; + optional bool Important = 9 [default = false]; } repeated TConsumer Consumers = 37; diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 33bfab78447e..3ff734d11d48 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -166,7 +166,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } - config->MutablePartitionConfig()->AddImportantClientId(consumerName); + consumer->SetImportant(true); + if (NPQ::ReadRuleCompatible()) { + config->MutablePartitionConfig()->AddImportantClientId(consumerName); + } } if (!rr.service_type().empty()) { @@ -350,7 +353,10 @@ namespace NKikimr::NGRpcProxy::V1 { if (pqConfig.GetTopicsAreFirstClassCitizen() && !AppData(ctx)->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) { return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } - config->MutablePartitionConfig()->AddImportantClientId(consumerName); + consumer->SetImportant(true); + if (NPQ::ReadRuleCompatible()) { + config->MutablePartitionConfig()->AddImportantClientId(consumerName); + } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); @@ -372,9 +378,11 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearReadRuleServiceTypes(); config->ClearConsumers(); - for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { - if (importantConsumer != consumerName) { - config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); + if (NPQ::ReadRuleCompatible()) { + for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { + if (importantConsumer != consumerName) { + config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); + } } } @@ -1272,7 +1280,7 @@ namespace NKikimr::NGRpcProxy::V1 { consumers.push_back({false, Ydb::Topic::Consumer{}}); // do not check service type for presented consumers auto& consumer = consumers.back().second; consumer.set_name(name); - consumer.set_important(NPQ::IsImportantClient(*config, oldName)); + consumer.set_important(c.GetImportant()); consumer.mutable_read_from()->set_seconds(c.GetReadFromTimestampsMs() / 1000); (*consumer.mutable_attributes())["_service_type"] = c.GetServiceType(); (*consumer.mutable_attributes())["_version"] = TStringBuilder() << c.GetVersion(); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 6db48045ec71..2f043e4f1670 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -146,7 +146,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T for (const auto &codec : consumer.GetCodec().GetIds()) { rr->add_supported_codecs((Ydb::PersQueue::V1::Codec) (codec + 1)); } - rr->set_important(NPQ::IsImportantClient(config, consumer.GetName())); + rr->set_important(consumer.GetImportant()); if (consumer.HasServiceType()) { rr->set_service_type(consumer.GetServiceType()); @@ -1036,7 +1036,7 @@ bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfi rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1)); } - rr->set_important(NPQ::IsImportantClient(config, consumer.GetName())); + rr->set_important(consumer.GetImportant()); TString serviceType = ""; if (consumer.HasServiceType()) { serviceType = consumer.GetServiceType();