Skip to content

Commit

Permalink
Prohibit commit to the past for inactive partitions (ydb-platform#4170)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Apr 27, 2024
1 parent 3d12c62 commit 1b8616b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
9 changes: 9 additions & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,15 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
*/
}

if (!IsActive() && act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && static_cast<i64>(EndOffset) == userInfo.Offset && offset < EndOffset) {
TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
ScheduleReplyError(act.Cookie,
NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_PAST,
TStringBuilder() << "set offset " << act.Offset << " to past for consumer " << act.ClientId << " for inactive partition");

return;
}

EmulatePostProcessUserAct(act, userInfo, ctx);
}

Expand Down
29 changes: 29 additions & 0 deletions ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,35 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
readSession1.Close();
readSession2.Close();
}

Y_UNIT_TEST(CommitTopPast) {
TTopicSdkTestSetup setup = CreateSetup();
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);

TTopicClient client = setup.MakeClient();

auto writeSession = CreateWriteSession(client, "producer-1", 0);
UNIT_ASSERT(writeSession->Write(Msg("message_1", 2)));
UNIT_ASSERT(writeSession->Write(Msg("message_2", 3)));

ui64 txId = 1023;
SplitPartition(setup, ++txId, 0, "a");

auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit");

status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 1).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward.");

status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back.");

status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition.");

status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions.");
}
}

} // namespace NKikimr

0 comments on commit 1b8616b

Please sign in to comment.