Skip to content

Commit

Permalink
replace ImportantClientIds
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Feb 29, 2024
1 parent 01a27bb commit b5badc1
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 32 deletions.
16 changes: 10 additions & 6 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,13 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
}

ui64 TPartition::ImportantClientsMinOffset() const {
const auto& partConfig = Config.GetPartitionConfig();

ui64 minOffset = EndOffset;
for (const auto& importantClientId : partConfig.GetImportantClientId()) {
const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId);
for (const auto& consumer : Config.GetConsumers()) {
if (!consumer.GetImportant()) {
continue;
}

const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName());
ui64 curOffset = StartOffset;
if (userInfo && userInfo->Offset >= 0) //-1 means no offset
curOffset = userInfo->Offset;
Expand Down Expand Up @@ -1773,8 +1775,10 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
}

TSet<TString> important;
for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) {
important.insert(importantUser);
for (const auto& consumer : config.GetConsumers()) {
if (consumer.GetImportant()) {
important.insert(consumer.GetName());
}
}

for (auto& consumer : config.GetConsumers()) {
Expand Down
15 changes: 10 additions & 5 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,27 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr&

void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
TSet<TString> important;
for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) {
important.insert(importantUser);
TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser);
for (const auto& consumer : Config.GetConsumers()) {
if (!consumer.GetImportant()) {
continue;
}

important.insert(consumer.GetName());

TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName());
if (userInfo && !userInfo->Important && userInfo->LabeledCounters) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup()));
userInfo->SetImportant(true);
continue;
}
if (!userInfo) {
userInfo = &UsersInfoStorage->Create(
ctx, importantUser, 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
);
}
if (userInfo->Offset < (i64)StartOffset)
userInfo->Offset = StartOffset;
ReadTimestampForOffset(importantUser, *userInfo, ctx);
ReadTimestampForOffset(consumer.GetName(), *userInfo, ctx);
}
for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {
Expand Down
21 changes: 11 additions & 10 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ ui64 PutUnitsSize(const ui64 size) {
return putUnitsCount;
}

bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) {
for (const auto& i : config.GetPartitionConfig().GetImportantClientId()) {
if (consumerName == i) {
return true;
}
}

return false;
}

void Migrate(NKikimrPQ::TPQTabletConfig& config) {
// if ReadRules isn`t empty than it is old configuration format
// when modify new format (add or alter a consumer) readRules is cleared
Expand Down Expand Up @@ -75,6 +85,7 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) {
if (i < config.ReadRuleGenerationsSize()) {
consumer->SetGeneration(config.GetReadRuleGenerations(i));
}
consumer->SetImportant(IsImportantClient(config, consumer.GetName()));
}
}
}
Expand All @@ -93,16 +104,6 @@ size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config) {
return config.ConsumersSize();
}

bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) {
for (const auto& i : config.GetPartitionConfig().GetImportantClientId()) {
if (consumerName == i) {
return true;
}
}

return false;
}

const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId) {
for(const auto& p : config.GetPartitions()) {
if (partitionId == p.GetPartitionId()) {
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/persqueue/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ constexpr bool ReadRuleCompatible() { return true; }
bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName);
size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config);

bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName);

const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId);

// The graph of split-merge operations.
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ message TPartitionConfig {
optional uint64 StorageLimitBytes = 16;

// List of ClientIds, for which we don't delete data until they are read by these clients
repeated string ImportantClientId = 4; //can be empty
repeated string ImportantClientId = 4; //can be empty . Deprecated. Use Consumer.Important
optional uint32 LowWatermark = 5 [default = 6291456]; //6Mb, compact blobs if they at least this big.
optional uint32 SourceIdLifetimeSeconds = 6 [ default = 1382400]; //16 days
optional uint32 SourceIdMaxCounts = 31 [default = 6000000]; // Maximum number of stored sourceId records in partition
Expand Down Expand Up @@ -332,6 +332,7 @@ message TPQTabletConfig {
optional EConsumerScalingSupport ScalingSupport = 6;
optional uint64 Version = 7 [default = 0];
optional uint64 Generation = 8 [default = 0];
optional bool Important = 9 [default = false];
}

repeated TConsumer Consumers = 37;
Expand Down
20 changes: 14 additions & 6 deletions ydb/services/lib/actors/pq_schema_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ namespace NKikimr::NGRpcProxy::V1 {
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
);
}
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
consumer->SetImportant(true);
if (NPQ::ReadRuleCompatible()) {
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
}
}

if (!rr.service_type().empty()) {
Expand Down Expand Up @@ -350,7 +353,10 @@ namespace NKikimr::NGRpcProxy::V1 {
if (pqConfig.GetTopicsAreFirstClassCitizen() && !AppData(ctx)->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) {
return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
consumer->SetImportant(true);
if (NPQ::ReadRuleCompatible()) {
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
}
}

return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK);
Expand All @@ -372,9 +378,11 @@ namespace NKikimr::NGRpcProxy::V1 {
config->ClearReadRuleServiceTypes();
config->ClearConsumers();

for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) {
if (importantConsumer != consumerName) {
config->MutablePartitionConfig()->AddImportantClientId(importantConsumer);
if (NPQ::ReadRuleCompatible()) {
for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) {
if (importantConsumer != consumerName) {
config->MutablePartitionConfig()->AddImportantClientId(importantConsumer);
}
}
}

Expand Down Expand Up @@ -1272,7 +1280,7 @@ namespace NKikimr::NGRpcProxy::V1 {
consumers.push_back({false, Ydb::Topic::Consumer{}}); // do not check service type for presented consumers
auto& consumer = consumers.back().second;
consumer.set_name(name);
consumer.set_important(NPQ::IsImportantClient(*config, oldName));
consumer.set_important(c.GetImportant());
consumer.mutable_read_from()->set_seconds(c.GetReadFromTimestampsMs() / 1000);
(*consumer.mutable_attributes())["_service_type"] = c.GetServiceType();
(*consumer.mutable_attributes())["_version"] = TStringBuilder() << c.GetVersion();
Expand Down
4 changes: 2 additions & 2 deletions ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T
for (const auto &codec : consumer.GetCodec().GetIds()) {
rr->add_supported_codecs((Ydb::PersQueue::V1::Codec) (codec + 1));
}
rr->set_important(NPQ::IsImportantClient(config, consumer.GetName()));
rr->set_important(consumer.GetImportant());

if (consumer.HasServiceType()) {
rr->set_service_type(consumer.GetServiceType());
Expand Down Expand Up @@ -1036,7 +1036,7 @@ bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfi
rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1));
}

rr->set_important(NPQ::IsImportantClient(config, consumer.GetName()));
rr->set_important(consumer.GetImportant());
TString serviceType = "";
if (consumer.HasServiceType()) {
serviceType = consumer.GetServiceType();
Expand Down

0 comments on commit b5badc1

Please sign in to comment.