Skip to content

Commit

Permalink
Delete deprecated code (ReadRules,..) (ydb-platform#8405)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored and stanislav-shchetinin committed Aug 30, 2024
1 parent 485265c commit 8000ee0
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 159 deletions.
4 changes: 1 addition & 3 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1746,9 +1746,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
generation = curConfigVersion;
}
c.SetGeneration(generation);
if (ReadRuleCompatible()) {
cfg.AddReadRuleGenerations(generation);
}
cfg.AddReadRuleGenerations(generation);
}
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset,
}
}
}
UNIT_ASSERT((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset);
UNIT_ASSERT_C((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset,
"expectedOffset=" << expectedOffset << " resp.HasOffset()=" << resp.HasOffset() << " resp.GetOffset()=" << resp.GetOffset());
if (writeTime > 0) {
UNIT_ASSERT(resp.HasWriteTimestampEstimateMS());
UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime);
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) {
}
consumer->SetImportant(IsImportantClient(config, consumer->GetName()));
}

config.ClearReadRules();
config.ClearReadFromTimestampsMs();
config.ClearConsumerFormatVersions();
config.ClearConsumerCodecs();
config.ClearReadRuleServiceTypes();
config.ClearReadRuleVersions();
config.ClearReadRuleGenerations();
}

if (!config.PartitionsSize()) {
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/persqueue/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ TString SourceIdHash(const TString& sourceId);

void Migrate(NKikimrPQ::TPQTabletConfig& config);

// This function required for marking the code which required remove after 25-1
constexpr bool ReadRuleCompatible() { return true; }

bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName);
size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config);

Expand Down
95 changes: 2 additions & 93 deletions ydb/services/lib/actors/pq_schema_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ namespace NKikimr::NGRpcProxy::V1 {
auto* consumer = config->AddConsumers();

consumer->SetName(consumerName);
if (NPQ::ReadRuleCompatible()) {
config->AddReadRules(consumerName);
}

if (rr.starting_message_timestamp_ms() < 0) {
return TMsgPqCodes(
Expand All @@ -110,9 +107,6 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetReadFromTimestampsMs(rr.starting_message_timestamp_ms());
if (NPQ::ReadRuleCompatible()) {
config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms());
}

if (!Ydb::PersQueue::V1::TopicSettings::Format_IsValid((int)rr.supported_format()) || rr.supported_format() == 0) {
return TMsgPqCodes(
Expand All @@ -121,9 +115,6 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetFormatVersion(rr.supported_format() - 1);
if (NPQ::ReadRuleCompatible()) {
config->AddConsumerFormatVersions(rr.supported_format() - 1);
}

if (rr.version() < 0) {
return TMsgPqCodes(
Expand All @@ -132,12 +123,8 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetVersion(rr.version());
if (NPQ::ReadRuleCompatible()) {
config->AddReadRuleVersions(rr.version());
}

auto* cct = consumer->MutableCodec();
auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr;
if (rr.supported_codecs().size() > MAX_SUPPORTED_CODECS_COUNT) {
return TMsgPqCodes(
TStringBuilder() << "supported_codecs count cannot be more than "
Expand All @@ -156,17 +143,10 @@ namespace NKikimr::NGRpcProxy::V1 {

cct->AddIds(codec - 1);
cct->AddCodecs(codecName);

if (NPQ::ReadRuleCompatible()) {
ct->CopyFrom(*cct);
}
}

if (rr.important()) {
consumer->SetImportant(true);
if (NPQ::ReadRuleCompatible()) {
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
}
}

if (!rr.service_type().empty()) {
Expand All @@ -178,9 +158,6 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetServiceType(rr.service_type());
if (NPQ::ReadRuleCompatible()) {
config->AddReadRuleServiceTypes(rr.service_type());
}
} else {
if (pqConfig.GetDisallowDefaultClientServiceType()) {
return TMsgPqCodes(
Expand All @@ -190,9 +167,6 @@ namespace NKikimr::NGRpcProxy::V1 {
}
const auto& defaultCientServiceType = pqConfig.GetDefaultClientServiceType().GetName();
consumer->SetServiceType(defaultCientServiceType);
if (NPQ::ReadRuleCompatible()) {
config->AddReadRuleServiceTypes(defaultCientServiceType);
}
}
return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK);
}
Expand Down Expand Up @@ -238,9 +212,6 @@ namespace NKikimr::NGRpcProxy::V1 {
auto* consumer = config->AddConsumers();

consumer->SetName(consumerName);
if (NPQ::ReadRuleCompatible()) {
config->AddReadRules(consumerName);
}

if (rr.read_from().seconds() < 0) {
return TMsgPqCodes(
Expand All @@ -249,19 +220,10 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetReadFromTimestampsMs(rr.read_from().seconds() * 1000);
if (NPQ::ReadRuleCompatible()) {
config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000);
}

consumer->SetFormatVersion(0);
if (NPQ::ReadRuleCompatible()) {
config->AddConsumerFormatVersions(0);
}

TString serviceType;

const auto& defaultClientServiceType = pqConfig.GetDefaultClientServiceType().GetName();
serviceType = defaultClientServiceType;
TString serviceType = defaultClientServiceType;

TString passwordHash = "";
bool hasPassword = false;
Expand Down Expand Up @@ -318,17 +280,9 @@ namespace NKikimr::NGRpcProxy::V1 {
}

consumer->SetServiceType(serviceType);
if (NPQ::ReadRuleCompatible()) {
config->AddReadRuleServiceTypes(serviceType);
}

consumer->SetVersion(version);
if (NPQ::ReadRuleCompatible()) {
config->AddReadRuleVersions(version);
}

auto* cct = consumer->MutableCodec();
auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr;

for(const auto& codec : rr.supported_codecs().codecs()) {
if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) {
Expand All @@ -339,20 +293,13 @@ namespace NKikimr::NGRpcProxy::V1 {
}
cct->AddIds(codec - 1);
cct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM");

if (NPQ::ReadRuleCompatible()) {
ct->CopyFrom(*cct);
}
}

if (rr.important()) {
if (pqConfig.GetTopicsAreFirstClassCitizen() && !enableTopicDiskSubDomainQuota) {
return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
consumer->SetImportant(true);
if (NPQ::ReadRuleCompatible()) {
config->MutablePartitionConfig()->AddImportantClientId(consumerName);
}
}

return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK);
Expand All @@ -363,7 +310,7 @@ namespace NKikimr::NGRpcProxy::V1 {
NKikimrPQ::TPQTabletConfig* config,
const NKikimrPQ::TPQTabletConfig& originalConfig,
const TString& consumerName,
const NKikimrPQ::TPQConfig& pqConfig
const NKikimrPQ::TPQConfig& /*pqConfig*/
) {
config->ClearReadRuleVersions();
config->ClearReadRules();
Expand All @@ -374,46 +321,8 @@ namespace NKikimr::NGRpcProxy::V1 {
config->ClearReadRuleServiceTypes();
config->ClearConsumers();

if (NPQ::ReadRuleCompatible()) {
for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) {
if (importantConsumer != consumerName) {
config->MutablePartitionConfig()->AddImportantClientId(importantConsumer);
}
}
}

bool removed = false;

if (NPQ::ReadRuleCompatible()) {
for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) {
auto& readRule = originalConfig.GetReadRules(i);

if (readRule == consumerName) {
removed = true;
continue;
}

config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i));
config->AddReadRules(readRule);
config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i));
config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i));
auto* ct = config->AddConsumerCodecs();
for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) {
ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j));
ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j));
}
if (i < originalConfig.ReadRuleServiceTypesSize()) {
config->AddReadRuleServiceTypes(originalConfig.GetReadRuleServiceTypes(i));
} else {
if (pqConfig.GetDisallowDefaultClientServiceType()) {
return TStringBuilder() << "service type cannot be empty for consumer '"
<< readRule << "'";
}
config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName());
}
}
}

for (auto& consumer : originalConfig.GetConsumers()) {
if (consumerName == consumer.GetName()) {
removed = true;
Expand Down
24 changes: 3 additions & 21 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5028,7 +5028,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
MaxCountInPartition: 2147483647
MaxSizeInPartition: 234
LifetimeSeconds: 172800
ImportantClientId: "consumer"
SourceIdLifetimeSeconds: 1382400
WriteSpeedInBytesPerSecond: 123
BurstSize: 1000
Expand Down Expand Up @@ -5078,31 +5077,13 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
Ident: "acc"
Topic: "topic3"
DC: "dc1"
ReadRules: "first-consumer"
ReadRules: "consumer"
ReadFromTimestampsMs: 11223344000
ReadFromTimestampsMs: 111000
ConsumerFormatVersions: 0
ConsumerFormatVersions: 0
ConsumerCodecs {
}
ConsumerCodecs {
Ids: 2
Ids: 10004
Codecs: "lzop"
Codecs: "CUSTOM"
}
ReadRuleServiceTypes: "data-streams"
ReadRuleServiceTypes: "data-streams"
FormatVersion: 0
Codecs {
Ids: 2
Ids: 10004
Codecs: "lzop"
Codecs: "CUSTOM"
}
ReadRuleVersions: 0
ReadRuleVersions: 567
TopicPath: "/Root/PQ/rt3.dc1--acc--topic3"
YdbDatabasePath: "/Root"
Consumers {
Expand All @@ -5113,7 +5094,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
}
ServiceType: "data-streams"
Version: 0
Important: false
}
Consumers {
Name: "consumer"
Expand Down Expand Up @@ -6101,6 +6081,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};


auto checkDescribe = [&](const TString& topic, const TVector<std::pair<TString, TString>>& readRules) {
Cerr << ">>>>> Check topic: " << topic << Endl;
DescribeTopicRequest request;
DescribeTopicResponse response;
request.set_path(topic);
Expand All @@ -6110,7 +6091,8 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
UNIT_ASSERT(status.ok());
DescribeTopicResult res;
response.operation().result().UnpackTo(&res);
Cerr << response << "\n" << res << "\n";
Cerr << ">>>>> Response: " << response << Endl;
Cerr << ">>>>> Result:" << res << "\n";
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);

UNIT_ASSERT_VALUES_EQUAL(res.settings().read_rules().size(), readRules.size());
Expand Down
Loading

0 comments on commit 8000ee0

Please sign in to comment.