Skip to content

Commit

Permalink
Merge 33a6c2d into c30bbc4
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Mar 1, 2024
2 parents c30bbc4 + 33a6c2d commit 5da27cd
Show file tree
Hide file tree
Showing 24 changed files with 415 additions and 233 deletions.
12 changes: 2 additions & 10 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kqp_topics.h"

#include <ydb/core/base/path.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/library/actors/core/log.h>

#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
Expand Down Expand Up @@ -294,16 +295,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
result.PQGroupInfo->Description;

if (Consumer_) {
bool found = false;

for (auto& consumer : description.GetPQTabletConfig().GetReadRules()) {
if (Consumer_ == consumer) {
found = true;
break;
}
}

if (!found) {
if (!NPQ::HasConsumer(description.GetPQTabletConfig(), *Consumer_)) {
builder << "Unknown consumer '" << *Consumer_ << "'";

status = Ydb::StatusIds::BAD_REQUEST;
Expand Down
35 changes: 18 additions & 17 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,13 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
}

ui64 TPartition::ImportantClientsMinOffset() const {
const auto& partConfig = Config.GetPartitionConfig();

ui64 minOffset = EndOffset;
for (const auto& importantClientId : partConfig.GetImportantClientId()) {
const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId);
for (const auto& consumer : Config.GetConsumers()) {
if (!consumer.GetImportant()) {
continue;
}

const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName());
ui64 curOffset = StartOffset;
if (userInfo && userInfo->Offset >= 0) //-1 means no offset
curOffset = userInfo->Offset;
Expand Down Expand Up @@ -544,7 +546,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
InitDone = true;
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());

FillReadFromTimestamps(Config, ctx);
FillReadFromTimestamps(ctx);
ResendPendingEvents(ctx);
ProcessTxsAndUserActs(ctx);

Expand Down Expand Up @@ -1791,29 +1793,30 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
}

TSet<TString> important;
for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) {
important.insert(importantUser);
for (const auto& consumer : config.GetConsumers()) {
if (consumer.GetImportant()) {
important.insert(consumer.GetName());
}
}

for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
auto& userInfo = GetOrCreatePendingUser(consumer, 0);
for (auto& consumer : config.GetConsumers()) {
auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0);

TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
if (!ts) {
ts += TDuration::MilliSeconds(1);
}
userInfo.ReadFromTimestamp = ts;
userInfo.Important = important.contains(consumer);
userInfo.Important = important.contains(consumer.GetName());

ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
ui64 rrGen = consumer.GetGeneration();
if (userInfo.ReadRuleGeneration != rrGen) {
TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, 0, TActorId{},
TEvPQ::TEvSetClientInfo act(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{},
TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);

ProcessUserAct(act, ctx);
}
hasReadRule.erase(consumer);
hasReadRule.erase(consumer.GetName());
}

for (auto& consumer : hasReadRule) {
Expand Down Expand Up @@ -1915,8 +1918,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf

Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);

UsersInfoStorage->UpdateConfig(Config);

Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
void CreateMirrorerActor();
void DoRead(TEvPQ::TEvRead::TPtr&& ev, TDuration waitQuotaTime, const TActorContext& ctx);
void FailBadClient(const TActorContext& ctx);
void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
void FillReadFromTimestamps(const TActorContext& ctx);
void FilterDeadlinedWrites(const TActorContext& ctx);

void Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
switch (response.GetStatus()) {
case NKikimrProto::OK:
Y_ABORT_UNLESS(Partition()->Config.ParseFromString(response.GetValue()));

Migrate(Partition()->Config);

if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
Partition()->TabletConfig);
Expand Down
34 changes: 20 additions & 14 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ void TPartition::SendReadingFinished(const TString& consumer) {
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId));
}

void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
TSet<TString> hasReadRule;

for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
userInfo.ReadFromTimestamp = TInstant::Zero();
userInfo.HasReadRule = false;
hasReadRule.insert(consumer);
}
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0);

for (auto& consumer : Config.GetConsumers()) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer.GetName(), ctx, 0);
userInfo.HasReadRule = true;
ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
if (userInfo.ReadRuleGeneration != rrGen) {

if (userInfo.ReadRuleGeneration != consumer.GetGeneration()) {
THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(
0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen
0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, consumer.GetGeneration()
);
//
// TODO(abcdef): заменить на вызов ProcessUserAct
Expand All @@ -61,12 +61,13 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
}
userInfo.Step = userInfo.Generation = 0;
}
hasReadRule.erase(consumer);
TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
hasReadRule.erase(consumer.GetName());
TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
if (!ts) ts += TDuration::MilliSeconds(1);
if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts)
userInfo.ReadFromTimestamp = ts;
}

for (auto& consumer : hasReadRule) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx);
if (userInfo.NoConsumer) {
Expand Down Expand Up @@ -179,22 +180,27 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr&

void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
TSet<TString> important;
for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) {
important.insert(importantUser);
TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser);
for (const auto& consumer : Config.GetConsumers()) {
if (!consumer.GetImportant()) {
continue;
}

important.insert(consumer.GetName());

TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName());
if (userInfo && !userInfo->Important && userInfo->LabeledCounters) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup()));
userInfo->SetImportant(true);
continue;
}
if (!userInfo) {
userInfo = &UsersInfoStorage->Create(
ctx, importantUser, 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
);
}
if (userInfo->Offset < (i64)StartOffset)
userInfo->Offset = StartOffset;
ReadTimestampForOffset(importantUser, *userInfo, ctx);
ReadTimestampForOffset(consumer.GetName(), *userInfo, ctx);
}
for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {
Expand Down
35 changes: 22 additions & 13 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "partition_log.h"
#include "partition.h"
#include "read.h"
#include "utils.h"

#include <ydb/core/base/tx_processing.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/persqueue/config/config.h>
Expand Down Expand Up @@ -1013,6 +1015,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
bool res = Config.ParseFromString(read.GetValue());
Y_ABORT_UNLESS(res);

Migrate(Config);

if (!Config.PartitionsSize()) {
for (const auto partitionId : Config.GetPartitionIds()) {
Config.AddPartitions()->SetPartitionId(partitionId);
Expand Down Expand Up @@ -1145,10 +1149,12 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
}

auto countReadRulesWithPricing = [&](const TActorContext& ctx, const auto& config) {
auto& defaultClientServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();

ui32 result = 0;
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
TString rrServiceType = config.ReadRuleServiceTypesSize() <= i ? "" : config.GetReadRuleServiceTypes(i);
if (rrServiceType.empty() || rrServiceType == AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName())
for (auto& consumer : config.GetConsumers()) {
TString serviceType = consumer.GetServiceType();
if (serviceType.empty() || serviceType == defaultClientServiceType)
++result;
}
return result;
Expand Down Expand Up @@ -1618,24 +1624,27 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
if (!cfg.HasCacheSize() && Config.HasCacheSize()) //if not set and it is alter - preserve old cache size
cfg.SetCacheSize(Config.GetCacheSize());

Migrate(cfg);

// set rr generation for provided read rules
{
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
auto version = i < Config.ReadRuleVersionsSize() ? Config.GetReadRuleVersions(i) : 0;
auto generation = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0;
existed[Config.GetReadRules(i)] = std::make_pair(version, generation);
for (const auto& c : Config.GetConsumers()) {
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
}
for (ui32 i = 0; i < cfg.ReadRulesSize(); ++i) {
auto version = i < cfg.ReadRuleVersionsSize() ? cfg.GetReadRuleVersions(i) : 0;
auto it = existed.find(cfg.GetReadRules(i));

for (auto& c : *cfg.MutableConsumers()) {
auto it = existed.find(c.GetName());
ui64 generation = 0;
if (it != existed.end() && it->second.first == version) {
if (it != existed.end() && it->second.first == c.GetVersion()) {
generation = it->second.second;
} else {
generation = curConfigVersion;
}
cfg.AddReadRuleGenerations(generation);
c.SetGeneration(generation);
if (ReadRuleCompatible()) {
cfg.AddReadRuleGenerations(generation);
}
}
}

Expand All @@ -1648,7 +1657,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf

BeginWriteConfig(cfg, bootstrapCfg, ctx);

NewConfig = cfg;
NewConfig = std::move(cfg);
}

void TPersQueue::BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg,
Expand Down
32 changes: 14 additions & 18 deletions ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,12 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
if (!config.empty()) {
bool res = Self->TabletConfig.ParseFromString(config);
Y_ABORT_UNLESS(res);

Migrate(Self->TabletConfig);
Self->Consumers.clear();

if (Self->TabletConfig.ReadRulesSize() == Self->TabletConfig.ConsumerScalingSupportSize()) {
for (size_t i = 0; i < Self->TabletConfig.ReadRulesSize(); ++i) {
Self->Consumers[Self->TabletConfig.GetReadRules(i)].ScalingSupport = Self->TabletConfig.GetConsumerScalingSupport(i);
}
} else {
for (const auto& rr : Self->TabletConfig.GetReadRules()) {
Self->Consumers[rr].ScalingSupport = DefaultScalingSupport();
}
for (auto& consumer : Self->TabletConfig.GetConsumers()) {
Self->Consumers[consumer.GetName()].ScalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport();
}

Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
Expand Down Expand Up @@ -532,10 +528,12 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
Version = record.GetVersion();
MaxPartsPerTablet = record.GetPartitionPerTablet();
PathId = record.GetPathId();
Topic = record.GetTopicName();
Path = record.GetPath();
Topic = std::move(record.GetTopicName());
Path = std::move(record.GetPath());
TxId = record.GetTxId();
TabletConfig = record.GetTabletConfig();
TabletConfig = std::move(record.GetTabletConfig());
Migrate(TabletConfig);

SchemeShardId = record.GetSchemeShardId();
TotalGroups = record.HasTotalGroupCount() ? record.GetTotalGroupCount() : 0;
ui32 prevNextPartitionId = NextPartitionId;
Expand All @@ -547,17 +545,15 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr

auto oldConsumers = std::move(Consumers);
Consumers.clear();
for (size_t i = 0; i < TabletConfig.ReadRulesSize(); ++i) {
auto& rr = TabletConfig.GetReadRules(i);
for (auto& consumer : TabletConfig.GetConsumers()) {
auto scalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport();

auto scalingSupport = i < TabletConfig.ConsumerScalingSupportSize() ? TabletConfig.GetConsumerScalingSupport(i)
: DefaultScalingSupport();
auto it = oldConsumers.find(rr);
auto it = oldConsumers.find(consumer.GetName());
if (it != oldConsumers.end()) {
auto& c = Consumers[rr] = std::move(it->second);
auto& c = Consumers[consumer.GetName()] = std::move(it->second);
c.ScalingSupport = scalingSupport;
} else {
Consumers[rr].ScalingSupport = scalingSupport;
Consumers[consumer.GetName()].ScalingSupport = scalingSupport;
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransactio
TabletConfig = tx.GetTabletConfig();
BootstrapConfig = tx.GetBootstrapConfig();

Migrate(TabletConfig);

InitPartitions();
}

Expand Down Expand Up @@ -157,6 +159,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();

Migrate(TabletConfig);

TPartitionGraph graph = MakePartitionGraph(TabletConfig);

for (const auto& p : TabletConfig.GetPartitions()) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/user_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
{
TString defaultServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();
TString userServiceType = "";
for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
if (Config.GetReadRules(i) == user) {
userServiceType = Config.ReadRuleServiceTypesSize() > i ? Config.GetReadRuleServiceTypes(i) : "";
for (auto& consumer : Config.GetConsumers()) {
if (consumer.GetName() == user) {
userServiceType = consumer.GetServiceType();
break;
}
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/persqueue/user_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,6 @@ class TUsersInfoStorage {
const TUserInfo* GetIfExists(const TString& user) const;
TUserInfo* GetIfExists(const TString& user);

void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) {
Config = config;
}

THashMap<TString, TUserInfo>& GetAll();

TUserInfoBase CreateUserInfo(const TString& user,
Expand Down Expand Up @@ -422,7 +418,7 @@ class TUsersInfoStorage {

TMaybe<TActorId> TabletActor;
TMaybe<TActorId> PartitionActor;
NKikimrPQ::TPQTabletConfig Config;
const NKikimrPQ::TPQTabletConfig& Config;

TString CloudId;
TString DbId;
Expand Down
Loading

0 comments on commit 5da27cd

Please sign in to comment.