Skip to content

Commit

Permalink
Merge 0257895 into 73359a7
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Feb 29, 2024
2 parents 73359a7 + 0257895 commit 1cdc217
Show file tree
Hide file tree
Showing 24 changed files with 380 additions and 214 deletions.
12 changes: 2 additions & 10 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kqp_topics.h"

#include <ydb/core/base/path.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/library/actors/core/log.h>

#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
Expand Down Expand Up @@ -294,16 +295,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
result.PQGroupInfo->Description;

if (Consumer_) {
bool found = false;

for (auto& consumer : description.GetPQTabletConfig().GetReadRules()) {
if (Consumer_ == consumer) {
found = true;
break;
}
}

if (!found) {
if (!NPQ::HasConsumer(description.GetPQTabletConfig(), *Consumer_)) {
builder << "Unknown consumer '" << *Consumer_ << "'";

status = Ydb::StatusIds::BAD_REQUEST;
Expand Down
19 changes: 8 additions & 11 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
InitDone = true;
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());

FillReadFromTimestamps(Config, ctx);
FillReadFromTimestamps(ctx);
ResendPendingEvents(ctx);
ProcessTxsAndUserActs(ctx);

Expand Down Expand Up @@ -1795,25 +1795,24 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
important.insert(importantUser);
}

for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
auto& userInfo = GetOrCreatePendingUser(consumer, 0);
for (auto& consumer : config.GetConsumers()) {
auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0);

TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
if (!ts) {
ts += TDuration::MilliSeconds(1);
}
userInfo.ReadFromTimestamp = ts;
userInfo.Important = important.contains(consumer);
userInfo.Important = important.contains(consumer.GetName());

ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
ui64 rrGen = consumer.GetGeneration();
if (userInfo.ReadRuleGeneration != rrGen) {
TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, 0, TActorId{},
TEvPQ::TEvSetClientInfo act(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{},
TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);

ProcessUserAct(act, ctx);
}
hasReadRule.erase(consumer);
hasReadRule.erase(consumer.GetName());
}

for (auto& consumer : hasReadRule) {
Expand Down Expand Up @@ -1915,8 +1914,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf

Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);

UsersInfoStorage->UpdateConfig(Config);

Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
void CreateMirrorerActor();
void DoRead(TEvPQ::TEvRead::TPtr&& ev, TDuration waitQuotaTime, const TActorContext& ctx);
void FailBadClient(const TActorContext& ctx);
void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
void FillReadFromTimestamps(const TActorContext& ctx);
void FilterDeadlinedWrites(const TActorContext& ctx);

void Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
switch (response.GetStatus()) {
case NKikimrProto::OK:
Y_ABORT_UNLESS(Partition()->Config.ParseFromString(response.GetValue()));

Migrate(Partition()->Config);

if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
Partition()->TabletConfig);
Expand Down
19 changes: 10 additions & 9 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ void TPartition::SendReadingFinished(const TString& consumer) {
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId));
}

void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
TSet<TString> hasReadRule;

for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
userInfo.ReadFromTimestamp = TInstant::Zero();
userInfo.HasReadRule = false;
hasReadRule.insert(consumer);
}
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0);

for (auto& consumer : Config.GetConsumers()) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer.GetName(), ctx, 0);
userInfo.HasReadRule = true;
ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
if (userInfo.ReadRuleGeneration != rrGen) {

if (userInfo.ReadRuleGeneration != consumer.GetGeneration()) {
THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(
0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen
0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, consumer.GetGeneration()
);
//
// TODO(abcdef): заменить на вызов ProcessUserAct
Expand All @@ -61,12 +61,13 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
}
userInfo.Step = userInfo.Generation = 0;
}
hasReadRule.erase(consumer);
TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
hasReadRule.erase(consumer.GetName());
TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
if (!ts) ts += TDuration::MilliSeconds(1);
if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts)
userInfo.ReadFromTimestamp = ts;
}

for (auto& consumer : hasReadRule) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx);
if (userInfo.NoConsumer) {
Expand Down
35 changes: 22 additions & 13 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "partition_log.h"
#include "partition.h"
#include "read.h"
#include "utils.h"

#include <ydb/core/base/tx_processing.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/persqueue/config/config.h>
Expand Down Expand Up @@ -1013,6 +1015,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
bool res = Config.ParseFromString(read.GetValue());
Y_ABORT_UNLESS(res);

Migrate(Config);

if (!Config.PartitionsSize()) {
for (const auto partitionId : Config.GetPartitionIds()) {
Config.AddPartitions()->SetPartitionId(partitionId);
Expand Down Expand Up @@ -1145,10 +1149,12 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
}

auto countReadRulesWithPricing = [&](const TActorContext& ctx, const auto& config) {
auto& defaultClientServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();

ui32 result = 0;
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
TString rrServiceType = config.ReadRuleServiceTypesSize() <= i ? "" : config.GetReadRuleServiceTypes(i);
if (rrServiceType.empty() || rrServiceType == AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName())
for (auto& consumer : config.GetConsumers()) {
TString serviceType = consumer.GetServiceType();
if (serviceType.empty() || serviceType == defaultClientServiceType)
++result;
}
return result;
Expand Down Expand Up @@ -1618,24 +1624,27 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
if (!cfg.HasCacheSize() && Config.HasCacheSize()) //if not set and it is alter - preserve old cache size
cfg.SetCacheSize(Config.GetCacheSize());

Migrate(cfg);

// set rr generation for provided read rules
{
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
auto version = i < Config.ReadRuleVersionsSize() ? Config.GetReadRuleVersions(i) : 0;
auto generation = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0;
existed[Config.GetReadRules(i)] = std::make_pair(version, generation);
for (const auto& c : Config.GetConsumers()) {
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
}
for (ui32 i = 0; i < cfg.ReadRulesSize(); ++i) {
auto version = i < cfg.ReadRuleVersionsSize() ? cfg.GetReadRuleVersions(i) : 0;
auto it = existed.find(cfg.GetReadRules(i));

for (auto& c : *cfg.MutableConsumers()) {
auto it = existed.find(c.GetName());
ui64 generation = 0;
if (it != existed.end() && it->second.first == version) {
if (it != existed.end() && it->second.first == c.GetVersion()) {
generation = it->second.second;
} else {
generation = curConfigVersion;
}
cfg.AddReadRuleGenerations(generation);
c.SetGeneration(generation);
if (ReadRuleCompatible()) {
cfg.AddReadRuleGenerations(generation);
}
}
}

Expand All @@ -1648,7 +1657,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf

BeginWriteConfig(cfg, bootstrapCfg, ctx);

NewConfig = cfg;
NewConfig = std::move(cfg);
}

void TPersQueue::BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg,
Expand Down
32 changes: 14 additions & 18 deletions ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,12 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
if (!config.empty()) {
bool res = Self->TabletConfig.ParseFromString(config);
Y_ABORT_UNLESS(res);

Migrate(Self->TabletConfig);
Self->Consumers.clear();

if (Self->TabletConfig.ReadRulesSize() == Self->TabletConfig.ConsumerScalingSupportSize()) {
for (size_t i = 0; i < Self->TabletConfig.ReadRulesSize(); ++i) {
Self->Consumers[Self->TabletConfig.GetReadRules(i)].ScalingSupport = Self->TabletConfig.GetConsumerScalingSupport(i);
}
} else {
for (const auto& rr : Self->TabletConfig.GetReadRules()) {
Self->Consumers[rr].ScalingSupport = DefaultScalingSupport();
}
for (auto& consumer : Self->TabletConfig.GetConsumers()) {
Self->Consumers[consumer.GetName()].ScalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport();
}

Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
Expand Down Expand Up @@ -532,10 +528,12 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
Version = record.GetVersion();
MaxPartsPerTablet = record.GetPartitionPerTablet();
PathId = record.GetPathId();
Topic = record.GetTopicName();
Path = record.GetPath();
Topic = std::move(record.GetTopicName());
Path = std::move(record.GetPath());
TxId = record.GetTxId();
TabletConfig = record.GetTabletConfig();
TabletConfig = std::move(record.GetTabletConfig());
Migrate(TabletConfig);

SchemeShardId = record.GetSchemeShardId();
TotalGroups = record.HasTotalGroupCount() ? record.GetTotalGroupCount() : 0;
ui32 prevNextPartitionId = NextPartitionId;
Expand All @@ -547,17 +545,15 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr

auto oldConsumers = std::move(Consumers);
Consumers.clear();
for (size_t i = 0; i < TabletConfig.ReadRulesSize(); ++i) {
auto& rr = TabletConfig.GetReadRules(i);
for (auto& consumer : TabletConfig.GetConsumers()) {
auto scalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport();

auto scalingSupport = i < TabletConfig.ConsumerScalingSupportSize() ? TabletConfig.GetConsumerScalingSupport(i)
: DefaultScalingSupport();
auto it = oldConsumers.find(rr);
auto it = oldConsumers.find(consumer.GetName());
if (it != oldConsumers.end()) {
auto& c = Consumers[rr] = std::move(it->second);
auto& c = Consumers[consumer.GetName()] = std::move(it->second);
c.ScalingSupport = scalingSupport;
} else {
Consumers[rr].ScalingSupport = scalingSupport;
Consumers[consumer.GetName()].ScalingSupport = scalingSupport;
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransactio
TabletConfig = tx.GetTabletConfig();
BootstrapConfig = tx.GetBootstrapConfig();

Migrate(TabletConfig);

InitPartitions();
}

Expand Down Expand Up @@ -157,6 +159,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();

Migrate(TabletConfig);

TPartitionGraph graph = MakePartitionGraph(TabletConfig);

for (const auto& p : TabletConfig.GetPartitions()) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/user_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
{
TString defaultServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();
TString userServiceType = "";
for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
if (Config.GetReadRules(i) == user) {
userServiceType = Config.ReadRuleServiceTypesSize() > i ? Config.GetReadRuleServiceTypes(i) : "";
for (auto& consumer : Config.GetConsumers()) {
if (consumer.GetName() == user) {
userServiceType = consumer.GetServiceType();
break;
}
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/persqueue/user_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,6 @@ class TUsersInfoStorage {
const TUserInfo* GetIfExists(const TString& user) const;
TUserInfo* GetIfExists(const TString& user);

void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) {
Config = config;
}

THashMap<TString, TUserInfo>& GetAll();

TUserInfoBase CreateUserInfo(const TString& user,
Expand Down Expand Up @@ -422,7 +418,7 @@ class TUsersInfoStorage {

TMaybe<TActorId> TabletActor;
TMaybe<TActorId> PartitionActor;
NKikimrPQ::TPQTabletConfig Config;
const NKikimrPQ::TPQTabletConfig& Config;

TString CloudId;
TString DbId;
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/partition.h>
#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/core/security/ticket_parser.h>
#include <ydb/core/tablet/tablet_counters_aggregator.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
Expand Down Expand Up @@ -49,7 +50,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
}
request->Record.MutableTabletConfig()->SetCacheSize(10_MB);
request->Record.SetTxId(12345);
auto tabletConfig = request->Record.MutableTabletConfig();
auto* tabletConfig = request->Record.MutableTabletConfig();
if (runtime.GetAppData().PQConfig.GetTopicsAreFirstClassCitizen()) {
tabletConfig->SetTopicName("topic");
tabletConfig->SetTopicPath(runtime.GetAppData().PQConfig.GetDatabase() + "/topic");
Expand Down Expand Up @@ -93,6 +94,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
if (u.first != "user")
tabletConfig->AddReadRules(u.first);
}

runtime.SendToPipe(tabletId, edge, request.Release(), 0, GetPipeConfigWithRetries());
TEvPersQueue::TEvUpdateConfigResponse* result =
runtime.GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
Expand Down Expand Up @@ -136,7 +138,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
}


void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestContext& tc, i64 ctime,
void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset, TTestContext& tc, i64 ctime,
ui64 writeTime) {
TAutoPtr<IEventHandle> handle;
TEvPersQueue::TEvResponse *result;
Expand Down Expand Up @@ -174,7 +176,7 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestCo
}
}
}
UNIT_ASSERT((offset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == offset);
UNIT_ASSERT((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset);
if (writeTime > 0) {
UNIT_ASSERT(resp.HasWriteTimestampEstimateMS());
UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ TActorId CmdCreateSession(const TPQCmdSettings& settings, TTestContext& tc);
void CmdGetOffset(
const ui32 partition,
const TString& user,
i64 offset,
i64 expectedOffset,
TTestContext& tc,
i64 ctime = -1,
ui64 writeTime = 0);
Expand Down
Loading

0 comments on commit 1cdc217

Please sign in to comment.