Skip to content

Commit

Permalink
Improove tablet generation value in Topic protocol (ydb-platform#2375) (
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Mar 4, 2024
1 parent 960afca commit f2b72c6
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 10 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ ydb/services/persqueue_v1/ut TPersQueueTest.CheckACLForGrpcWrite
ydb/services/persqueue_v1/ut TPersQueueTest.DirectRead*
ydb/services/persqueue_v1/ut TPersQueueTest.SetupLockSession
ydb/services/persqueue_v1/ut TPersQueueTest.TopicServiceCommitOffsetBadOffsets
ydb/services/persqueue_v1/ut TPersQueueTest.UpdatePartitionLocation
ydb/services/persqueue_v1/ut TPQCompatTest.BadTopics
ydb/services/persqueue_v1/ut [3/10]*
ydb/services/ydb/sdk_sessions_pool_ut YdbSdkSessionsPool.StressTestSync*
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {

LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"init complete for topic '" << TopicName() << "' partition " << Partition << " " << ctx.SelfID
"init complete for topic '" << TopicName() << "' partition " << Partition << " generation " << TabletGeneration << " " << ctx.SelfID
);

TStringBuilder ss;
Expand Down
7 changes: 3 additions & 4 deletions ydb/services/persqueue_v1/actors/partition_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo


if (!StartReading) {
ctx.Send(ParentId, new TEvPQProxy::TEvPartitionStatus(Partition, CommittedOffset, EndOffset, WriteTimestampEstimateMs, TabletGeneration, NodeId));
ctx.Send(ParentId, new TEvPQProxy::TEvPartitionStatus(Partition, CommittedOffset, EndOffset, WriteTimestampEstimateMs, NodeId, TabletGeneration));
} else {
InitStartReading(ctx);
}
Expand Down Expand Up @@ -802,15 +802,14 @@ void TPartitionActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const
TEvTabletPipe::TEvClientConnected *msg = ev->Get();

LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition
<< " pipe restart attempt " << PipeGeneration << " pipe creation result: " << msg->Status);
<< " pipe restart attempt " << PipeGeneration << " pipe creation result: " << msg->Status
<< " TabletId: " << msg->TabletId << " Generation: " << msg->Generation);

if (msg->Status != NKikimrProto::OK) {
RestartPipe(ctx, TStringBuilder() << "pipe to tablet is dead " << msg->TabletId, NPersQueue::NErrorCode::TABLET_PIPE_DISCONNECTED);
return;
}

auto prevGeneration = TabletGeneration;
Y_UNUSED(prevGeneration);
TabletGeneration = msg->Generation;
NodeId = msg->ServerId.NodeId();

Expand Down
8 changes: 4 additions & 4 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
TPersQueueV1TestServer server;
SET_LOCALS;
MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService);
server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY});
server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY, NKikimrServices::PERSQUEUE});
server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG);
server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR);

Expand All @@ -742,8 +742,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}

// await and confirm CreatePartitionStreamRequest from server
i64 assignId = 0;
i64 generation = 0;
i64 assignId;
i64 generation;
{
Ydb::Topic::StreamReadMessage::FromServer resp;

Expand All @@ -756,8 +756,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(resp.start_partition_session_request().partition_session().path(), "acc/topic1");
UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0);
UNIT_ASSERT(resp.start_partition_session_request().partition_location().generation() > 0);
generation = resp.start_partition_session_request().partition_location().generation();
assignId = resp.start_partition_session_request().partition_session().partition_session_id();
generation = resp.start_partition_session_request().partition_location().generation();
}

server.Server->AnnoyingClient->RestartPartitionTablets(server.Server->CleverServer->GetRuntime(), "rt3.dc1--acc--topic1");
Expand Down

0 comments on commit f2b72c6

Please sign in to comment.