Skip to content

Commit

Permalink
intermediate
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Feb 27, 2024
1 parent 4a3f12b commit c2f10dc
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 32 deletions.
15 changes: 7 additions & 8 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1777,25 +1777,24 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
important.insert(importantUser);
}

for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
auto& userInfo = GetOrCreatePendingUser(consumer, 0);
for (auto& consumer : config.GetConsumers()) {
auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0);

TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
if (!ts) {
ts += TDuration::MilliSeconds(1);
}
userInfo.ReadFromTimestamp = ts;
userInfo.Important = important.contains(consumer);
userInfo.Important = important.contains(consumer.GetName());

ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
ui64 rrGen = consumer.GetGeneration();
if (userInfo.ReadRuleGeneration != rrGen) {
TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, 0, TActorId{},
TEvPQ::TEvSetClientInfo act(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{},
TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);

ProcessUserAct(act, ctx);
}
hasReadRule.erase(consumer);
hasReadRule.erase(consumer.GetName());
}

for (auto& consumer : hasReadRule) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
switch (response.GetStatus()) {
case NKikimrProto::OK:
Y_ABORT_UNLESS(Partition()->Config.ParseFromString(response.GetValue()));

Migrate(Partition()->Config);

if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
Partition()->TabletConfig);
Expand Down
17 changes: 9 additions & 8 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
userInfo.HasReadRule = false;
hasReadRule.insert(consumer);
}
for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
const auto& consumer = Config.GetReadRules(i);
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0);

for (auto& consumer : Config.GetConsumers()) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer.GetName(), ctx, 0);
userInfo.HasReadRule = true;
ui64 rrGen = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0;
if (userInfo.ReadRuleGeneration != rrGen) {

if (userInfo.ReadRuleGeneration != consumer.GetGeneration()) {
THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(
0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen
0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, consumer.GetGeneration()
);
//
// TODO(abcdef): заменить на вызов ProcessUserAct
Expand All @@ -61,12 +61,13 @@ void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
}
userInfo.Step = userInfo.Generation = 0;
}
hasReadRule.erase(consumer);
TInstant ts = i < Config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(Config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
hasReadRule.erase(consumer.GetName());
TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
if (!ts) ts += TDuration::MilliSeconds(1);
if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts)
userInfo.ReadFromTimestamp = ts;
}

for (auto& consumer : hasReadRule) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx);
if (userInfo.NoConsumer) {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "partition_log.h"
#include "partition.h"
#include "read.h"
#include "utils.h"

#include <ydb/core/base/tx_processing.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/persqueue/config/config.h>
Expand Down Expand Up @@ -998,6 +1000,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
bool res = Config.ParseFromString(read.GetValue());
Y_ABORT_UNLESS(res);

Migrate(Config);

if (!Config.PartitionsSize()) {
for (const auto partitionId : Config.GetPartitionIds()) {
Config.AddPartitions()->SetPartitionId(partitionId);
Expand Down Expand Up @@ -1506,6 +1510,8 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf

NKikimrPQ::TPQTabletConfig cfg = record.GetTabletConfig();

Migrate(cfg);

Y_ABORT_UNLESS(cfg.HasVersion());
int curConfigVersion = cfg.GetVersion();

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransactio
TabletConfig = tx.GetTabletConfig();
BootstrapConfig = tx.GetBootstrapConfig();

Migrate(TabletConfig);

InitPartitions();
}

Expand Down Expand Up @@ -157,6 +159,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();

Migrate(TabletConfig);

TPartitionGraph graph = MakePartitionGraph(TabletConfig);

for (const auto& p : TabletConfig.GetPartitions()) {
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/partition.h>
#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/core/security/ticket_parser.h>
#include <ydb/core/tablet/tablet_counters_aggregator.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
Expand Down Expand Up @@ -49,7 +50,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
}
request->Record.MutableTabletConfig()->SetCacheSize(10_MB);
request->Record.SetTxId(12345);
auto tabletConfig = request->Record.MutableTabletConfig();
auto* tabletConfig = request->Record.MutableTabletConfig();
if (runtime.GetAppData().PQConfig.GetTopicsAreFirstClassCitizen()) {
tabletConfig->SetTopicName("topic");
tabletConfig->SetTopicPath(runtime.GetAppData().PQConfig.GetDatabase() + "/topic");
Expand Down Expand Up @@ -93,6 +94,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
if (u.first != "user")
tabletConfig->AddReadRules(u.first);
}

runtime.SendToPipe(tabletId, edge, request.Release(), 0, GetPipeConfigWithRetries());
TEvPersQueue::TEvUpdateConfigResponse* result =
runtime.GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
Expand Down Expand Up @@ -136,7 +138,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
}


void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestContext& tc, i64 ctime,
void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset, TTestContext& tc, i64 ctime,
ui64 writeTime) {
TAutoPtr<IEventHandle> handle;
TEvPersQueue::TEvResponse *result;
Expand Down Expand Up @@ -174,7 +176,7 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestCo
}
}
}
UNIT_ASSERT((offset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == offset);
UNIT_ASSERT((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset);
if (writeTime > 0) {
UNIT_ASSERT(resp.HasWriteTimestampEstimateMS());
UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ TActorId CmdCreateSession(const TPQCmdSettings& settings, TTestContext& tc);
void CmdGetOffset(
const ui32 partition,
const TString& user,
i64 offset,
i64 expectedOffset,
TTestContext& tc,
i64 ctime = -1,
ui64 writeTime = 0);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/ut/make_config.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "make_config.h"

#include <ydb/core/persqueue/utils.h>

namespace NKikimr::NPQ::NHelpers {

NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
Expand Down Expand Up @@ -27,6 +29,8 @@ NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,

config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);

Migrate(config);

return config;
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/ut/pq_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ Y_UNIT_TEST(TestUserInfoCompatibility) {
CmdGetOffset(1, client, 1, tc);
CmdGetOffset(2, client, 1, tc);
CmdGetOffset(3, client, 1, tc);

});
}

Expand Down
9 changes: 2 additions & 7 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ui64 PutUnitsSize(const ui64 size) {
}

void Migrate(NKikimrPQ::TPQTabletConfig& config) {
Cerr << ">>>>> Migrate" << Endl;
if (!config.ConsumersSize()) {
for(size_t i = 0; i < config.ReadRulesSize(); ++i) {
auto* consumer = config.AddConsumers();
Expand All @@ -60,13 +61,7 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) {
if (i < config.ConsumerCodecsSize()) {
auto& src = config.GetConsumerCodecs(i);
auto* dst = consumer->MutableCodec();

for (auto value : src.GetIds()) {
dst->AddIds(value);
}
for (auto& value : src.GetCodecs()) {
dst->AddCodecs(value);
}
dst->CopyFrom(src);
}
if (i < config.ReadRuleServiceTypesSize()) {
consumer->SetServiceType(config.GetReadRuleServiceTypes(i));
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,13 @@ message TPQTabletConfig {

message TConsumer {
optional string Name = 1;
optional uint64 ReadFromTimestampsMs = 2;
optional uint64 FormatVersion = 3;
optional uint64 ReadFromTimestampsMs = 2 [default = 0];
optional uint64 FormatVersion = 3 [default = 0];
optional TCodecs Codec = 4;
optional string ServiceType = 5;
optional EConsumerScalingSupport ScalingSupport = 6;
optional uint64 Version = 7;
optional uint64 Generation = 8;
optional uint64 Version = 7 [default = 0];
optional uint64 Generation = 8 [default = 0];
}

repeated TConsumer Consumers = 37;
Expand Down

0 comments on commit c2f10dc

Please sign in to comment.