diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp index 2e603e754dd7..a3ce9e76554c 100644 --- a/ydb/core/kqp/topics/kqp_topics.cpp +++ b/ydb/core/kqp/topics/kqp_topics.cpp @@ -1,6 +1,7 @@ #include "kqp_topics.h" #include +#include #include #define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) @@ -294,16 +295,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac result.PQGroupInfo->Description; if (Consumer_) { - bool found = false; - - for (auto& consumer : description.GetPQTabletConfig().GetReadRules()) { - if (Consumer_ == consumer) { - found = true; - break; - } - } - - if (!found) { + if (!NPQ::HasConsumer(description.GetPQTabletConfig(), *Consumer_)) { builder << "Unknown consumer '" << *Consumer_ << "'"; status = Ydb::StatusIds::BAD_REQUEST; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index a5a25bec724d..71c8a1bcf911 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -279,11 +279,13 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) { } ui64 TPartition::ImportantClientsMinOffset() const { - const auto& partConfig = Config.GetPartitionConfig(); - ui64 minOffset = EndOffset; - for (const auto& importantClientId : partConfig.GetImportantClientId()) { - const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId); + for (const auto& consumer : Config.GetConsumers()) { + if (!consumer.GetImportant()) { + continue; + } + + const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName()); ui64 curOffset = StartOffset; if (userInfo && userInfo->Offset >= 0) //-1 means no offset curOffset = userInfo->Offset; @@ -544,7 +546,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { InitDone = true; TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds()); - FillReadFromTimestamps(Config, ctx); + FillReadFromTimestamps(ctx); ResendPendingEvents(ctx); ProcessTxsAndUserActs(ctx); @@ -1791,29 +1793,30 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co } TSet important; - for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) { - important.insert(importantUser); + for (const auto& consumer : config.GetConsumers()) { + if (consumer.GetImportant()) { + important.insert(consumer.GetName()); + } } - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - const auto& consumer = config.GetReadRules(i); - auto& userInfo = GetOrCreatePendingUser(consumer, 0); + for (auto& consumer : config.GetConsumers()) { + auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0); - TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); + TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs()); if (!ts) { ts += TDuration::MilliSeconds(1); } userInfo.ReadFromTimestamp = ts; - userInfo.Important = important.contains(consumer); + userInfo.Important = important.contains(consumer.GetName()); - ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0; + ui64 rrGen = consumer.GetGeneration(); if (userInfo.ReadRuleGeneration != rrGen) { - TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, 0, TActorId{}, + TEvPQ::TEvSetClientInfo act(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen); ProcessUserAct(act, ctx); } - hasReadRule.erase(consumer); + hasReadRule.erase(consumer.GetName()); } for (auto& consumer : hasReadRule) { @@ -1915,8 +1918,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0); - UsersInfoStorage->UpdateConfig(Config); - Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 40d1f8e28b4f..dd12f65022cb 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -117,7 +117,7 @@ class TPartition : public TActorBootstrapped { void CreateMirrorerActor(); void DoRead(TEvPQ::TEvRead::TPtr&& ev, TDuration waitQuotaTime, const TActorContext& ctx); void FailBadClient(const TActorContext& ctx); - void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); + void FillReadFromTimestamps(const TActorContext& ctx); void FilterDeadlinedWrites(const TActorContext& ctx); void Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 407bdb7a92a5..311dc01757f6 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -163,6 +163,9 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon switch (response.GetStatus()) { case NKikimrProto::OK: Y_ABORT_UNLESS(Partition()->Config.ParseFromString(response.GetValue())); + + Migrate(Partition()->Config); + if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) { auto event = MakeHolder(Partition()->TopicConverter, Partition()->TabletConfig); diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 3cf09386dfe1..f5070d2b23d9 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -33,7 +33,7 @@ void TPartition::SendReadingFinished(const TString& consumer) { Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId)); } -void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) { +void TPartition::FillReadFromTimestamps(const TActorContext& ctx) { TSet hasReadRule; for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { @@ -41,14 +41,14 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config userInfo.HasReadRule = false; hasReadRule.insert(consumer); } - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - const auto& consumer = config.GetReadRules(i); - auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0); + + for (auto& consumer : Config.GetConsumers()) { + auto& userInfo = UsersInfoStorage->GetOrCreate(consumer.GetName(), ctx, 0); userInfo.HasReadRule = true; - ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0; - if (userInfo.ReadRuleGeneration != rrGen) { + + if (userInfo.ReadRuleGeneration != consumer.GetGeneration()) { THolder event = MakeHolder( - 0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen + 0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, consumer.GetGeneration() ); // // TODO(abcdef): заменить на вызов ProcessUserAct @@ -61,12 +61,13 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config } userInfo.Step = userInfo.Generation = 0; } - hasReadRule.erase(consumer); - TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); + hasReadRule.erase(consumer.GetName()); + TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs()); if (!ts) ts += TDuration::MilliSeconds(1); if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) userInfo.ReadFromTimestamp = ts; } + for (auto& consumer : hasReadRule) { auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx); if (userInfo.NoConsumer) { @@ -179,9 +180,14 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { TSet important; - for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) { - important.insert(importantUser); - TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser); + for (const auto& consumer : Config.GetConsumers()) { + if (!consumer.GetImportant()) { + continue; + } + + important.insert(consumer.GetName()); + + TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName()); if (userInfo && !userInfo->Important && userInfo->LabeledCounters) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup())); userInfo->SetImportant(true); @@ -189,12 +195,12 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { } if (!userInfo) { userInfo = &UsersInfoStorage->Create( - ctx, importantUser, 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {} + ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {} ); } if (userInfo->Offset < (i64)StartOffset) userInfo->Offset = StartOffset; - ReadTimestampForOffset(importantUser, *userInfo, ctx); + ReadTimestampForOffset(consumer.GetName(), *userInfo, ctx); } for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) { diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 53b485d32063..629e7edbef42 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -4,6 +4,8 @@ #include "partition_log.h" #include "partition.h" #include "read.h" +#include "utils.h" + #include #include #include @@ -1013,6 +1015,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& bool res = Config.ParseFromString(read.GetValue()); Y_ABORT_UNLESS(res); + Migrate(Config); + if (!Config.PartitionsSize()) { for (const auto partitionId : Config.GetPartitionIds()) { Config.AddPartitions()->SetPartitionId(partitionId); @@ -1145,10 +1149,12 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) { } auto countReadRulesWithPricing = [&](const TActorContext& ctx, const auto& config) { + auto& defaultClientServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName(); + ui32 result = 0; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - TString rrServiceType = config.ReadRuleServiceTypesSize() <= i ? "" : config.GetReadRuleServiceTypes(i); - if (rrServiceType.empty() || rrServiceType == AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName()) + for (auto& consumer : config.GetConsumers()) { + TString serviceType = consumer.GetServiceType(); + if (serviceType.empty() || serviceType == defaultClientServiceType) ++result; } return result; @@ -1618,24 +1624,27 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr> existed; // map name -> rrVersion, rrGeneration - for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) { - auto version = i < Config.ReadRuleVersionsSize() ? Config.GetReadRuleVersions(i) : 0; - auto generation = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0; - existed[Config.GetReadRules(i)] = std::make_pair(version, generation); + for (const auto& c : Config.GetConsumers()) { + existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration()); } - for (ui32 i = 0; i < cfg.ReadRulesSize(); ++i) { - auto version = i < cfg.ReadRuleVersionsSize() ? cfg.GetReadRuleVersions(i) : 0; - auto it = existed.find(cfg.GetReadRules(i)); + + for (auto& c : *cfg.MutableConsumers()) { + auto it = existed.find(c.GetName()); ui64 generation = 0; - if (it != existed.end() && it->second.first == version) { + if (it != existed.end() && it->second.first == c.GetVersion()) { generation = it->second.second; } else { generation = curConfigVersion; } - cfg.AddReadRuleGenerations(generation); + c.SetGeneration(generation); + if (ReadRuleCompatible()) { + cfg.AddReadRuleGenerations(generation); + } } } @@ -1648,7 +1657,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtrTabletConfig.ParseFromString(config); Y_ABORT_UNLESS(res); + + Migrate(Self->TabletConfig); Self->Consumers.clear(); - if (Self->TabletConfig.ReadRulesSize() == Self->TabletConfig.ConsumerScalingSupportSize()) { - for (size_t i = 0; i < Self->TabletConfig.ReadRulesSize(); ++i) { - Self->Consumers[Self->TabletConfig.GetReadRules(i)].ScalingSupport = Self->TabletConfig.GetConsumerScalingSupport(i); - } - } else { - for (const auto& rr : Self->TabletConfig.GetReadRules()) { - Self->Consumers[rr].ScalingSupport = DefaultScalingSupport(); - } + for (auto& consumer : Self->TabletConfig.GetConsumers()) { + Self->Consumers[consumer.GetName()].ScalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport(); } Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig); @@ -532,10 +528,12 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr Version = record.GetVersion(); MaxPartsPerTablet = record.GetPartitionPerTablet(); PathId = record.GetPathId(); - Topic = record.GetTopicName(); - Path = record.GetPath(); + Topic = std::move(record.GetTopicName()); + Path = std::move(record.GetPath()); TxId = record.GetTxId(); - TabletConfig = record.GetTabletConfig(); + TabletConfig = std::move(record.GetTabletConfig()); + Migrate(TabletConfig); + SchemeShardId = record.GetSchemeShardId(); TotalGroups = record.HasTotalGroupCount() ? record.GetTotalGroupCount() : 0; ui32 prevNextPartitionId = NextPartitionId; @@ -547,17 +545,15 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr auto oldConsumers = std::move(Consumers); Consumers.clear(); - for (size_t i = 0; i < TabletConfig.ReadRulesSize(); ++i) { - auto& rr = TabletConfig.GetReadRules(i); + for (auto& consumer : TabletConfig.GetConsumers()) { + auto scalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport(); - auto scalingSupport = i < TabletConfig.ConsumerScalingSupportSize() ? TabletConfig.GetConsumerScalingSupport(i) - : DefaultScalingSupport(); - auto it = oldConsumers.find(rr); + auto it = oldConsumers.find(consumer.GetName()); if (it != oldConsumers.end()) { - auto& c = Consumers[rr] = std::move(it->second); + auto& c = Consumers[consumer.GetName()] = std::move(it->second); c.ScalingSupport = scalingSupport; } else { - Consumers[rr].ScalingSupport = scalingSupport; + Consumers[consumer.GetName()].ScalingSupport = scalingSupport; } } diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 546b4fa05adb..ba9c9ba1fd5f 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -70,6 +70,8 @@ void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransactio TabletConfig = tx.GetTabletConfig(); BootstrapConfig = tx.GetBootstrapConfig(); + Migrate(TabletConfig); + InitPartitions(); } @@ -157,6 +159,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans TabletConfig = txBody.GetTabletConfig(); BootstrapConfig = txBody.GetBootstrapConfig(); + Migrate(TabletConfig); + TPartitionGraph graph = MakePartitionGraph(TabletConfig); for (const auto& p : TabletConfig.GetPartitions()) { diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp index 5a13af0ad669..92bf080dbf5c 100644 --- a/ydb/core/persqueue/user_info.cpp +++ b/ydb/core/persqueue/user_info.cpp @@ -181,9 +181,9 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx, { TString defaultServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName(); TString userServiceType = ""; - for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) { - if (Config.GetReadRules(i) == user) { - userServiceType = Config.ReadRuleServiceTypesSize() > i ? Config.GetReadRuleServiceTypes(i) : ""; + for (auto& consumer : Config.GetConsumers()) { + if (consumer.GetName() == user) { + userServiceType = consumer.GetServiceType(); break; } } diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 328fdef7165a..25fd902c951f 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -383,10 +383,6 @@ class TUsersInfoStorage { const TUserInfo* GetIfExists(const TString& user) const; TUserInfo* GetIfExists(const TString& user); - void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) { - Config = config; - } - THashMap& GetAll(); TUserInfoBase CreateUserInfo(const TString& user, @@ -422,7 +418,7 @@ class TUsersInfoStorage { TMaybe TabletActor; TMaybe PartitionActor; - NKikimrPQ::TPQTabletConfig Config; + const NKikimrPQ::TPQTabletConfig& Config; TString CloudId; TString DbId; diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index 5aee82f0b911..e0ca19454505 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -49,7 +50,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, } request->Record.MutableTabletConfig()->SetCacheSize(10_MB); request->Record.SetTxId(12345); - auto tabletConfig = request->Record.MutableTabletConfig(); + auto* tabletConfig = request->Record.MutableTabletConfig(); if (runtime.GetAppData().PQConfig.GetTopicsAreFirstClassCitizen()) { tabletConfig->SetTopicName("topic"); tabletConfig->SetTopicPath(runtime.GetAppData().PQConfig.GetDatabase() + "/topic"); @@ -93,6 +94,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, if (u.first != "user") tabletConfig->AddReadRules(u.first); } + runtime.SendToPipe(tabletId, edge, request.Release(), 0, GetPipeConfigWithRetries()); TEvPersQueue::TEvUpdateConfigResponse* result = runtime.GrabEdgeEvent(handle); @@ -136,7 +138,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, } -void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestContext& tc, i64 ctime, +void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset, TTestContext& tc, i64 ctime, ui64 writeTime) { TAutoPtr handle; TEvPersQueue::TEvResponse *result; @@ -174,7 +176,7 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestCo } } } - UNIT_ASSERT((offset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == offset); + UNIT_ASSERT((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset); if (writeTime > 0) { UNIT_ASSERT(resp.HasWriteTimestampEstimateMS()); UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime); diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index f3e90c12d092..bf2394080763 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -487,7 +487,7 @@ TActorId CmdCreateSession(const TPQCmdSettings& settings, TTestContext& tc); void CmdGetOffset( const ui32 partition, const TString& user, - i64 offset, + i64 expectedOffset, TTestContext& tc, i64 ctime = -1, ui64 writeTime = 0); diff --git a/ydb/core/persqueue/ut/make_config.cpp b/ydb/core/persqueue/ut/make_config.cpp index ceeed50b440a..8fbe5cfb63c3 100644 --- a/ydb/core/persqueue/ut/make_config.cpp +++ b/ydb/core/persqueue/ut/make_config.cpp @@ -1,5 +1,7 @@ #include "make_config.h" +#include + namespace NKikimr::NPQ::NHelpers { NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, @@ -27,6 +29,8 @@ NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + Migrate(config); + return config; } diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 9af2d4c4e71a..ec3680b0cca4 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -397,7 +397,6 @@ Y_UNIT_TEST(TestUserInfoCompatibility) { CmdGetOffset(1, client, 1, tc); CmdGetOffset(2, client, 1, tc); CmdGetOffset(3, client, 1, tc); - }); } diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 6008d848e232..f9589c40416f 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -45,6 +45,65 @@ ui64 PutUnitsSize(const ui64 size) { return putUnitsCount; } +bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { + for (const auto& i : config.GetPartitionConfig().GetImportantClientId()) { + if (consumerName == i) { + return true; + } + } + + return false; +} + +void Migrate(NKikimrPQ::TPQTabletConfig& config) { + // if ReadRules isn`t empty than it is old configuration format + // when modify new format (add or alter a consumer) readRules is cleared + if (config.ReadRulesSize()) { + config.ClearConsumers(); + + for(size_t i = 0; i < config.ReadRulesSize(); ++i) { + auto* consumer = config.AddConsumers(); + + consumer->SetName(config.GetReadRules(i)); + if (i < config.ReadFromTimestampsMsSize()) { + consumer->SetReadFromTimestampsMs(config.GetReadFromTimestampsMs(i)); + } + if (i < config.ConsumerFormatVersionsSize()) { + consumer->SetFormatVersion(config.GetConsumerFormatVersions(i)); + } + if (i < config.ConsumerCodecsSize()) { + auto& src = config.GetConsumerCodecs(i); + auto* dst = consumer->MutableCodec(); + dst->CopyFrom(src); + } + if (i < config.ReadRuleServiceTypesSize()) { + consumer->SetServiceType(config.GetReadRuleServiceTypes(i)); + } + if (i < config.ReadRuleVersionsSize()) { + consumer->SetVersion(config.GetReadRuleVersions(i)); + } + if (i < config.ReadRuleGenerationsSize()) { + consumer->SetGeneration(config.GetReadRuleGenerations(i)); + } + consumer->SetImportant(IsImportantClient(config, consumer->GetName())); + } + } +} + +bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { + for (auto& cons : config.GetConsumers()) { + if (cons.GetName() == consumerName) { + return true; + } + } + + return false; +} + +size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config) { + return config.ConsumersSize(); +} + const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId) { for(const auto& p : config.GetPartitions()) { if (partitionId == p.GetPartitionId()) { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index df6fe683a907..1952f5912b6d 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -14,6 +14,14 @@ ui64 PutUnitsSize(const ui64 size); TString SourceIdHash(const TString& sourceId); +void Migrate(NKikimrPQ::TPQTabletConfig& config); + +// This function required for marking the code which required remove after 25-1 +constexpr bool ReadRuleCompatible() { return true; } + +bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); +size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config); + const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId); // The graph of split-merge operations. diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 5817c6c93fa3..4471ebd1907c 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -235,7 +235,7 @@ message TPartitionConfig { optional uint64 StorageLimitBytes = 16; // List of ClientIds, for which we don't delete data until they are read by these clients - repeated string ImportantClientId = 4; //can be empty + repeated string ImportantClientId = 4; //can be empty . Deprecated. Use Consumer.Important optional uint32 LowWatermark = 5 [default = 6291456]; //6Mb, compact blobs if they at least this big. optional uint32 SourceIdLifetimeSeconds = 6 [ default = 1382400]; //16 days optional uint32 SourceIdMaxCounts = 31 [default = 6000000]; // Maximum number of stored sourceId records in partition @@ -307,22 +307,35 @@ message TPQTabletConfig { optional string DC = 12; // ReadRules, ReadTopicTimestampMs, ReadRuleVersions, ConsumerFormatVersions, ConsumersCodecs and ConsumerScalingSupport form a consumer data array stored by columns - repeated string ReadRules = 13; - repeated uint64 ReadFromTimestampsMs = 14; - repeated uint64 ConsumerFormatVersions = 15; + repeated string ReadRules = 13; // Deprecated. Use Consumers.Name + repeated uint64 ReadFromTimestampsMs = 14; // Deprecated. Use Consumers.ReadFromTimestampsMs + repeated uint64 ConsumerFormatVersions = 15; // Deprecated. Use Consumers.FormatVersion message TCodecs { repeated int64 Ids = 1; repeated string Codecs = 2; } - repeated TCodecs ConsumerCodecs = 16; - repeated EConsumerScalingSupport ConsumerScalingSupport = 37; - repeated string ReadRuleServiceTypes = 17; + repeated TCodecs ConsumerCodecs = 16; // Deprecated. Use Consumers.Codec + repeated string ReadRuleServiceTypes = 17; // Deprecated. Use Consumers.ServiceType optional uint64 FormatVersion = 20; optional TCodecs Codecs = 21; - repeated uint64 ReadRuleVersions = 22; - repeated uint64 ReadRuleGenerations = 32; + repeated uint64 ReadRuleVersions = 22; // Deprecated. Use Consumers.Version + repeated uint64 ReadRuleGenerations = 32; // Deprecated. Use Consumers.Generation + + message TConsumer { + optional string Name = 1; + optional uint64 ReadFromTimestampsMs = 2 [default = 0]; + optional uint64 FormatVersion = 3 [default = 0]; + optional TCodecs Codec = 4; + optional string ServiceType = 5; + optional EConsumerScalingSupport ScalingSupport = 6; + optional uint64 Version = 7 [default = 0]; + optional uint64 Generation = 8 [default = 0]; + optional bool Important = 9 [default = false]; + } + + repeated TConsumer Consumers = 37; optional string TopicPath = 23; diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 96443917beb6..3230bfef94c6 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -1481,6 +1482,7 @@ class TSchemeCache: public TMonitorableActor { Kind = TNavigate::KindTopic; IsPrivatePath = CalcPathIsPrivate(entryDesc.GetPathType(), entryDesc.GetPathSubType()); if (Created) { + NPQ::Migrate(*pathDesc.MutablePersQueueGroup()->MutablePQTabletConfig()); FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup())); } break; diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 8b2e373acf28..cac2f2ef9d62 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -240,9 +240,8 @@ namespace NKikimr::NDataStreams::V1 { const auto& response = ev->Get()->Request.Get()->ResultSet.front(); const auto& pqGroupDescription = response.PQGroupInfo->Description; - const auto& readRules = pqGroupDescription.GetPQTabletConfig().GetReadRules(); - if (readRules.size() > 0 && EnforceDeletion == false) { + if (NPQ::ConsumerCount(pqGroupDescription.GetPQTabletConfig()) > 0 && EnforceDeletion == false) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::IN_USE), TStringBuilder() << "Stream has registered consumers" << "and EnforceConsumerDeletion flag is false", ActorContext()); @@ -983,24 +982,25 @@ namespace NKikimr::NDataStreams::V1 { ui32 leftToRead{0}; const auto& response = result->ResultSet.front(); const auto& pqGroupDescription = response.PQGroupInfo->Description; - const auto& streamReadRulesNames = pqGroupDescription.GetPQTabletConfig().GetReadRules(); - const auto& streamReadRulesReadFromTimestamps = pqGroupDescription.GetPQTabletConfig().GetReadFromTimestampsMs(); + const auto& streamConsumers = pqGroupDescription.GetPQTabletConfig().GetConsumers(); const auto alreadyRead = NextToken.GetAlreadyRead(); - if (alreadyRead > (ui32)streamReadRulesNames.size()) { + ui32 consumerCount = NPQ::ConsumerCount(pqGroupDescription.GetPQTabletConfig()); + + if (alreadyRead > consumerCount) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Provided next_token is malformed - " << "everything is already read", ActorContext()); } - const auto rulesToRead = std::min(streamReadRulesNames.size() - alreadyRead, MaxResults); + const auto rulesToRead = std::min(consumerCount - alreadyRead, MaxResults); readRules.reserve(rulesToRead); - auto itName = streamReadRulesNames.begin() + alreadyRead; - auto itTs = streamReadRulesReadFromTimestamps.begin() + alreadyRead; - for (auto i = rulesToRead; i > 0; --i, ++itName, ++itTs) { - readRules.push_back({*itName, *itTs}); + + auto consumer = streamConsumers.begin() + alreadyRead; + for (auto i = rulesToRead; i > 0; --i, ++consumer) { + readRules.push_back({consumer->GetName(), consumer->GetReadFromTimestampsMs()}); } - leftToRead = streamReadRulesNames.size() - alreadyRead - rulesToRead; + leftToRead = consumerCount - alreadyRead - rulesToRead; SendResponse(ActorContext(), readRules, leftToRead); } @@ -1929,7 +1929,7 @@ namespace NKikimr::NDataStreams::V1 { : Ydb::DataStreams::V1::StreamDescription::CREATING ); descriptionSummary.set_open_shard_count(PQGroup.GetPartitions().size()); - descriptionSummary.set_consumer_count(PQGroup.MutablePQTabletConfig()->GetReadRules().size()); + descriptionSummary.set_consumer_count(NPQ::ConsumerCount(PQGroup.GetPQTabletConfig())); descriptionSummary.set_encryption_type(Ydb::DataStreams::V1::EncryptionType::NONE); Request_->SendResult(result, Ydb::StatusIds::SUCCESS); diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index c6564b247985..3ff734d11d48 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -92,7 +93,12 @@ namespace NKikimr::NGRpcProxy::V1 { } } - config->AddReadRules(consumerName); + auto* consumer = config->AddConsumers(); + + consumer->SetName(consumerName); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRules(consumerName); + } if (rr.starting_message_timestamp_ms() < 0) { return TMsgPqCodes( @@ -100,7 +106,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } - config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms()); + consumer->SetReadFromTimestampsMs(rr.starting_message_timestamp_ms()); + if (NPQ::ReadRuleCompatible()) { + config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms()); + } if (!Ydb::PersQueue::V1::TopicSettings::Format_IsValid((int)rr.supported_format()) || rr.supported_format() == 0) { return TMsgPqCodes( @@ -108,7 +117,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ); } - config->AddConsumerFormatVersions(rr.supported_format() - 1); + consumer->SetFormatVersion(rr.supported_format() - 1); + if (NPQ::ReadRuleCompatible()) { + config->AddConsumerFormatVersions(rr.supported_format() - 1); + } if (rr.version() < 0) { return TMsgPqCodes( @@ -116,8 +128,13 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } - config->AddReadRuleVersions(rr.version()); - auto ct = config->AddConsumerCodecs(); + consumer->SetVersion(rr.version()); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleVersions(rr.version()); + } + + auto* cct = consumer->MutableCodec(); + auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr; if (rr.supported_codecs().size() > MAX_SUPPORTED_CODECS_COUNT) { return TMsgPqCodes( TStringBuilder() << "supported_codecs count cannot be more than " @@ -131,8 +148,15 @@ namespace NKikimr::NGRpcProxy::V1 { TStringBuilder() << "Unknown codec with value " << codec << " for " << rr.consumer_name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ); - ct->AddIds(codec - 1); - ct->AddCodecs(to_lower(Ydb::PersQueue::V1::Codec_Name((Ydb::PersQueue::V1::Codec)codec)).substr(6)); + + auto codecName = to_lower(Ydb::PersQueue::V1::Codec_Name((Ydb::PersQueue::V1::Codec)codec)).substr(6); + + cct->AddIds(codec - 1); + cct->AddCodecs(codecName); + + if (NPQ::ReadRuleCompatible()) { + ct->CopyFrom(*cct); + } } if (rr.important()) { @@ -142,7 +166,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } - config->MutablePartitionConfig()->AddImportantClientId(consumerName); + consumer->SetImportant(true); + if (NPQ::ReadRuleCompatible()) { + config->MutablePartitionConfig()->AddImportantClientId(consumerName); + } } if (!rr.service_type().empty()) { @@ -153,7 +180,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ); } - config->AddReadRuleServiceTypes(rr.service_type()); + consumer->SetServiceType(rr.service_type()); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleServiceTypes(rr.service_type()); + } } else { const auto& pqConfig = AppData(ctx)->PQConfig; if (pqConfig.GetDisallowDefaultClientServiceType()) { @@ -163,7 +193,10 @@ namespace NKikimr::NGRpcProxy::V1 { ); } const auto& defaultCientServiceType = pqConfig.GetDefaultClientServiceType().GetName(); - config->AddReadRuleServiceTypes(defaultCientServiceType); + consumer->SetServiceType(defaultCientServiceType); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleServiceTypes(defaultCientServiceType); + } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); } @@ -205,7 +238,12 @@ namespace NKikimr::NGRpcProxy::V1 { } } - config->AddReadRules(consumerName); + auto* consumer = config->AddConsumers(); + + consumer->SetName(consumerName); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRules(consumerName); + } if (rr.read_from().seconds() < 0) { return TMsgPqCodes( @@ -213,9 +251,16 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } - config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000); + consumer->SetReadFromTimestampsMs(rr.read_from().seconds() * 1000); + if (NPQ::ReadRuleCompatible()) { + config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000); + } + + consumer->SetFormatVersion(0); + if (NPQ::ReadRuleCompatible()) { + config->AddConsumerFormatVersions(0); + } - config->AddConsumerFormatVersions(0); TString serviceType; const auto& pqConfig = AppData(ctx)->PQConfig; @@ -276,10 +321,18 @@ namespace NKikimr::NGRpcProxy::V1 { } } - config->AddReadRuleServiceTypes(serviceType); - config->AddReadRuleVersions(version); + consumer->SetServiceType(serviceType); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleServiceTypes(serviceType); + } + + consumer->SetVersion(version); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleVersions(version); + } - auto ct = config->AddConsumerCodecs(); + auto* cct = consumer->MutableCodec(); + auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr; for(const auto& codec : rr.supported_codecs().codecs()) { if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) { @@ -288,15 +341,22 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ); } - ct->AddIds(codec - 1); - ct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); + cct->AddIds(codec - 1); + cct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); + + if (NPQ::ReadRuleCompatible()) { + ct->CopyFrom(*cct); + } } if (rr.important()) { if (pqConfig.GetTopicsAreFirstClassCitizen() && !AppData(ctx)->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) { return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } - config->MutablePartitionConfig()->AddImportantClientId(consumerName); + consumer->SetImportant(true); + if (NPQ::ReadRuleCompatible()) { + config->MutablePartitionConfig()->AddImportantClientId(consumerName); + } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); @@ -309,9 +369,6 @@ namespace NKikimr::NGRpcProxy::V1 { const TString& consumerName, const TActorContext& ctx ) { - THashSet rulesToRemove; - rulesToRemove.insert(consumerName); - config->ClearReadRuleVersions(); config->ClearReadRules(); config->ClearReadFromTimestampsMs(); @@ -319,42 +376,61 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearConsumerCodecs(); config->MutablePartitionConfig()->ClearImportantClientId(); config->ClearReadRuleServiceTypes(); + config->ClearConsumers(); - for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { - if (rulesToRemove.find(importantConsumer) == rulesToRemove.end()) { - config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); + if (NPQ::ReadRuleCompatible()) { + for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { + if (importantConsumer != consumerName) { + config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); + } } } + bool removed = false; + const auto& pqConfig = AppData(ctx)->PQConfig; - for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) { - if (auto it = rulesToRemove.find(originalConfig.GetReadRules(i)); it != rulesToRemove.end()) { - rulesToRemove.erase(it); - continue; - } + if (NPQ::ReadRuleCompatible()) { + for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) { + auto& readRule = originalConfig.GetReadRules(i); - config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i)); - config->AddReadRules(originalConfig.GetReadRules(i)); - config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i)); - config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i)); - auto ct = config->AddConsumerCodecs(); - for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) { - ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j)); - ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j)); - } - if (i < originalConfig.ReadRuleServiceTypesSize()) { - config->AddReadRuleServiceTypes(originalConfig.GetReadRuleServiceTypes(i)); - } else { - if (pqConfig.GetDisallowDefaultClientServiceType()) { - return TStringBuilder() << "service type cannot be empty for consumer '" - << originalConfig.GetReadRules(i) << "'"; + if (readRule == consumerName) { + removed = true; + continue; + } + + config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i)); + config->AddReadRules(readRule); + config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i)); + config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i)); + auto* ct = config->AddConsumerCodecs(); + for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) { + ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j)); + ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j)); + } + if (i < originalConfig.ReadRuleServiceTypesSize()) { + config->AddReadRuleServiceTypes(originalConfig.GetReadRuleServiceTypes(i)); + } else { + if (pqConfig.GetDisallowDefaultClientServiceType()) { + return TStringBuilder() << "service type cannot be empty for consumer '" + << readRule << "'"; + } + config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName()); } - config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName()); } } - if (rulesToRemove.size() > 0) { - return TStringBuilder() << "Rule for consumer " << *rulesToRemove.begin() << " doesn't exist"; + for (auto& consumer : originalConfig.GetConsumers()) { + if (consumerName == consumer.GetName()) { + removed = true; + continue; + } + + auto* dst = config->AddConsumers(); + dst->CopyFrom(consumer); + } + + if (!removed) { + return TStringBuilder() << "Rule for consumer " << consumerName << " doesn't exist"; } return ""; @@ -364,27 +440,28 @@ namespace NKikimr::NGRpcProxy::V1 { const TClientServiceTypes& supportedClientServiceTypes, TString& error, const TActorContext& ctx) { - if (config.GetReadRules().size() > MAX_READ_RULES_COUNT) { + size_t consumerCount = NPQ::ConsumerCount(config); + if (consumerCount > MAX_READ_RULES_COUNT) { error = TStringBuilder() << "read rules count cannot be more than " - << MAX_READ_RULES_COUNT << ", provided " << config.GetReadRules().size(); + << MAX_READ_RULES_COUNT << ", provided " << consumerCount; return false; } THashSet readRuleConsumers; - for (auto consumerName : config.GetReadRules()) { - if (readRuleConsumers.find(consumerName) != readRuleConsumers.end()) { - error = TStringBuilder() << "Duplicate consumer name " << consumerName; + for (auto consumer : config.GetConsumers()) { + if (readRuleConsumers.find(consumer.GetName()) != readRuleConsumers.end()) { + error = TStringBuilder() << "Duplicate consumer name " << consumer.GetName(); return true; } - readRuleConsumers.insert(consumerName); + readRuleConsumers.insert(consumer.GetName()); } for (const auto& t : supportedClientServiceTypes) { auto type = t.first; - auto count = std::count_if(config.GetReadRuleServiceTypes().begin(), config.GetReadRuleServiceTypes().end(), - [type](const TString& cType){ - return type == cType; + auto count = std::count_if(config.GetConsumers().begin(), config.GetConsumers().end(), + [type](const auto& c){ + return type == c.GetServiceType(); }); auto limit = t.second.MaxCount; if (count > limit) { @@ -393,13 +470,12 @@ namespace NKikimr::NGRpcProxy::V1 { } } if (config.GetCodecs().IdsSize() > 0) { - for (ui32 i = 0; i < config.ConsumerCodecsSize(); ++i) { - TString name = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); + for (const auto& consumer : config.GetConsumers()) { + TString name = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); - auto& consumerCodecs = config.GetConsumerCodecs(i); - if (consumerCodecs.IdsSize() > 0) { + if (consumer.GetCodec().IdsSize() > 0) { THashSet codecs; - for (auto& cc : consumerCodecs.GetIds()) { + for (auto& cc : consumer.GetCodec().GetIds()) { codecs.insert(cc); } for (auto& cc : config.GetCodecs().GetIds()) { @@ -1108,6 +1184,7 @@ namespace NKikimr::NGRpcProxy::V1 { auto config = pqDescr.MutablePQTabletConfig(); + NPQ::Migrate(*config); auto partConfig = config->MutablePartitionConfig(); if (request.alter_attributes().size()) { @@ -1186,11 +1263,11 @@ namespace NKikimr::NGRpcProxy::V1 { i32 dropped = 0; - for (ui32 i = 0; i < config->ReadRulesSize(); ++i) { - TString oldName = config->GetReadRules(i); - TString name = NPersQueue::ConvertOldConsumerName(oldName, ctx); + for (const auto& c : config->GetConsumers()) { + auto& oldName = c.GetName(); + auto name = NPersQueue::ConvertOldConsumerName(oldName, ctx); + bool erase = false; - bool important = false; for (auto consumer: request.drop_consumers()) { if (consumer == name || consumer == oldName) { erase = true; @@ -1199,20 +1276,15 @@ namespace NKikimr::NGRpcProxy::V1 { } } if (erase) continue; - for (auto imp : partConfig->GetImportantClientId()) { - if (imp == oldName) { - important = true; - break; - } - } + consumers.push_back({false, Ydb::Topic::Consumer{}}); // do not check service type for presented consumers auto& consumer = consumers.back().second; consumer.set_name(name); - consumer.set_important(important); - consumer.mutable_read_from()->set_seconds(config->GetReadFromTimestampsMs(i) / 1000); - (*consumer.mutable_attributes())["_service_type"] = config->GetReadRuleServiceTypes(i); - (*consumer.mutable_attributes())["_version"] = TStringBuilder() << config->GetReadRuleVersions(i); - for (ui32 codec : config->GetConsumerCodecs(i).GetIds()) { + consumer.set_important(c.GetImportant()); + consumer.mutable_read_from()->set_seconds(c.GetReadFromTimestampsMs() / 1000); + (*consumer.mutable_attributes())["_service_type"] = c.GetServiceType(); + (*consumer.mutable_attributes())["_version"] = TStringBuilder() << c.GetVersion(); + for (ui32 codec : c.GetCodec().GetIds()) { consumer.mutable_supported_codecs()->add_codecs(codec + 1); } } @@ -1252,6 +1324,7 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearReadRuleServiceTypes(); config->ClearReadRuleGenerations(); config->ClearReadRuleVersions(); + config->ClearConsumers(); for (const auto& rr : consumers) { auto messageAndCode = AddReadRuleToConfig(config, rr.second, supportedClientServiceTypes, rr.first, ctx); diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index a2a7c0e6f069..b47c98f1e2e6 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -4,6 +4,7 @@ #include "persqueue_utils.h" #include +#include namespace NKikimr::NGRpcProxy::V1 { @@ -197,15 +198,8 @@ bool TReadInitAndAuthActor::CheckTopicACL( return false; } if (!SkipReadRuleCheck && (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())) { - bool found = false; - for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) { - if (cons == ClientId) { - found = true; - break; - } - } //TODO : add here checking of client-service-type password. Provide it via API-call. - if (!found) { + if (!NPQ::HasConsumer(pqDescr.GetPQTabletConfig(), ClientId)) { CloseSession( TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "' in current cluster '" << LocalCluster, PersQueue::ErrorCode::BAD_REQUEST, ctx diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 636f7b4e2041..2f043e4f1670 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -2,10 +2,10 @@ #include "persqueue_utils.h" +#include +#include #include - #include -#include namespace NKikimr::NGRpcProxy::V1 { @@ -134,28 +134,22 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T } const auto& pqConfig = AppData(ActorContext())->PQConfig; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { + + for (const auto& consumer : config.GetConsumers()) { auto rr = settings->add_read_rules(); - auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ActorContext()); + auto consumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ActorContext()); rr->set_consumer_name(consumerName); - rr->set_starting_message_timestamp_ms(config.GetReadFromTimestampsMs(i)); + rr->set_starting_message_timestamp_ms(consumer.GetReadFromTimestampsMs()); rr->set_supported_format( - (Ydb::PersQueue::V1::TopicSettings::Format) (config.GetConsumerFormatVersions(i) + 1)); - rr->set_version(config.GetReadRuleVersions(i)); - for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) { + (Ydb::PersQueue::V1::TopicSettings::Format) (consumer.GetFormatVersion() + 1)); + rr->set_version(consumer.GetVersion()); + for (const auto &codec : consumer.GetCodec().GetIds()) { rr->add_supported_codecs((Ydb::PersQueue::V1::Codec) (codec + 1)); } - bool important = false; - for (const auto &c : partConfig.GetImportantClientId()) { - if (c == config.GetReadRules(i)) { - important = true; - break; - } - } - rr->set_important(important); + rr->set_important(consumer.GetImportant()); - if (i < config.ReadRuleServiceTypesSize()) { - rr->set_service_type(config.GetReadRuleServiceTypes(i)); + if (consumer.HasServiceType()) { + rr->set_service_type(consumer.GetServiceType()); } else { if (pqConfig.GetDisallowDefaultClientServiceType()) { this->Request_->RaiseIssue(FillIssue( @@ -1022,38 +1016,30 @@ bool TDescribeConsumerActor::ApplyResponse( const auto& location = record.GetLocations(i); auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location(); SetPartitionLocation(location, locationResult); - } return true; } -bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, ui32 i, +bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer, const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error) { - const auto &partConfig = config.GetPartitionConfig(); const auto& pqConfig = AppData(ctx)->PQConfig; - auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); + auto consumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); rr->set_name(consumerName); - rr->mutable_read_from()->set_seconds(config.GetReadFromTimestampsMs(i) / 1000); - auto version = config.GetReadRuleVersions(i); + rr->mutable_read_from()->set_seconds(consumer.GetReadFromTimestampsMs() / 1000); + auto version = consumer.GetVersion(); if (version != 0) (*rr->mutable_attributes())["_version"] = TStringBuilder() << version; - for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) { + for (const auto &codec : consumer.GetCodec().GetIds()) { rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1)); } - bool important = false; - for (const auto &c : partConfig.GetImportantClientId()) { - if (c == config.GetReadRules(i)) { - important = true; - break; - } - } - rr->set_important(important); + + rr->set_important(consumer.GetImportant()); TString serviceType = ""; - if (i < config.ReadRuleServiceTypesSize()) { - serviceType = config.GetReadRuleServiceTypes(i); + if (consumer.HasServiceType()) { + serviceType = consumer.GetServiceType(); } else { if (pqConfig.GetDisallowDefaultClientServiceType()) { error = "service type must be set for all read rules"; @@ -1155,12 +1141,14 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } auto consumerName = NPersQueue::ConvertNewConsumerName(Settings.Consumer, ActorContext()); bool found = false; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - if (consumerName == config.GetReadRules(i)) found = true; + for (const auto& consumer : config.GetConsumers()) { + if (consumerName == consumer.GetName()) { + found = true; + } auto rr = Result.add_consumers(); Ydb::StatusIds::StatusCode status; TString error; - if (!FillConsumerProto(rr, config, i, ActorContext(), status, error)) { + if (!FillConsumerProto(rr, config, consumer, ActorContext(), status, error)) { return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext()); } } @@ -1210,14 +1198,16 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache:: auto consumerName = NPersQueue::ConvertNewConsumerName(Settings.Consumer, ActorContext()); bool found = false; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - if (consumerName != config.GetReadRules(i)) + for (const auto& consumer : config.GetConsumers()) { + if (consumerName != consumer.GetName()) { continue; + } found = true; + auto rr = Result.mutable_consumer(); Ydb::StatusIds::StatusCode status; TString error; - if (!FillConsumerProto(rr, config, i, ActorContext(), status, error)) { + if (!FillConsumerProto(rr, config, consumer, ActorContext(), status, error)) { return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext()); } break; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 312604fe48c2..1f50d9a6b734 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -5020,10 +5020,35 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { ReadRuleVersions: 567 TopicPath: "/Root/PQ/rt3.dc1--acc--topic3" YdbDatabasePath: "/Root" + Consumers { + Name: "first-consumer" + ReadFromTimestampsMs: 11223344000 + FormatVersion: 0 + Codec { + } + ServiceType: "data-streams" + Version: 0 + } + Consumers { + Name: "consumer" + ReadFromTimestampsMs: 111000 + FormatVersion: 0 + Codec { + Ids: 2 + Ids: 10004 + Codecs: "lzop" + Codecs: "CUSTOM" + } + ServiceType: "data-streams" + Version: 567 + } } ErrorCode: OK } )___"; + + Cerr << ">>>>> " << res.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(res.DebugString(), resultDescribe); Cerr << "DESCRIBES:\n"; diff --git a/ydb/services/persqueue_v1/topic_yql_ut.cpp b/ydb/services/persqueue_v1/topic_yql_ut.cpp index 9d2a1734db96..9c0d45eae52d 100644 --- a/ydb/services/persqueue_v1/topic_yql_ut.cpp +++ b/ydb/services/persqueue_v1/topic_yql_ut.cpp @@ -67,6 +67,8 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) { partCfg->SetWriteSpeedInBytesPerSecond(9001); auto* rtfs = descrCopy.MutableReadFromTimestampsMs(); rtfs->Set(1, 1609462861000); + + descrCopy.MutableConsumers(1)->SetReadFromTimestampsMs(1609462861000); } const char *query2 = R"__( ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1` @@ -80,6 +82,10 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) { auto pqGroup2 = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription() .GetPersQueueGroup(); const auto& descr2 = pqGroup2.GetPQTabletConfig(); + + Cerr << ">>>>> 1: " << descrCopy.DebugString() << Endl; + Cerr << ">>>>> 2: " << descr2.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(descrCopy.DebugString(), descr2.DebugString()); const char *query3 = R"__(