Skip to content

Commit

Permalink
intermediate
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Jan 11, 2024
1 parent 3e7db58 commit 9471f9c
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 41 deletions.
31 changes: 29 additions & 2 deletions ydb/core/persqueue/ut/partition_chooser_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled)
p->SetPartitionId(3);
p->SetTabletId(1003);
p->MutableKeyRange()->SetFromBound("D");
p->AddChildPartitionIds(4);
}

{
Expand All @@ -84,6 +85,23 @@ NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled)
p->SetPartitionId(3);
p->SetTabletId(1003);
p->MutableKeyRange()->SetFromBound("D");
p->AddChildPartitionIds(4);
}

{
auto* p = result.AddPartitions();
p->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Active);
p->SetPartitionId(4);
p->SetTabletId(1004);
p->MutableKeyRange()->SetFromBound("D");
}

{
auto* p = result.MutablePQTabletConfig()->AddAllPartitions();
p->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Active);
p->SetPartitionId(4);
p->SetTabletId(1004);
p->MutableKeyRange()->SetFromBound("D");
}

return result;
Expand Down Expand Up @@ -281,6 +299,10 @@ class TPQTabletMock: public TActor<TPQTabletMock> {
auto* cmd = response->Record.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
cmd->SetOwnerCookie("ower_cookie");
cmd->SetStatus(Status);
cmd->SetRegistered(!!SeqNo);

auto* sn = response->Record.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult()->AddSourceIdInfo();
sn->SetSeqNo(SeqNo.value_or(0));

ctx.Send(ev->Sender, response.Release());
}
Expand All @@ -294,6 +316,7 @@ class TPQTabletMock: public TActor<TPQTabletMock> {

private:
ETopicPartitionStatus Status;
std::optional<ui64> SeqNo;
};


Expand All @@ -314,6 +337,8 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) {
CreatePQTabletMock(server, 1000, ETopicPartitionStatus::Active);
CreatePQTabletMock(server, 1001, ETopicPartitionStatus::Active);
CreatePQTabletMock(server, 1002, ETopicPartitionStatus::Active);
CreatePQTabletMock(server, 1003, ETopicPartitionStatus::Inactive);
CreatePQTabletMock(server, 1004, ETopicPartitionStatus::Active);

{
auto r = ChoosePartition(server, SMEnabled, "A_Source");
Expand All @@ -327,9 +352,11 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) {
}
{
// Define partition for sourceId that is not in partition boundary
// We use this partition because it active
WriteToTable(server, "X_Source_w_0", 0);
auto r = ChoosePartition(server, SMEnabled, "X_Source_w_0");
UNIT_ASSERT(r->Error);
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
}
{
// Redefine partition for sourceId. Check that partition changed;
Expand All @@ -343,7 +370,7 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) {
WriteToTable(server, "A_Source_w_0", 3);
auto r = ChoosePartition(server, SMEnabled, "A_Source_w_0");
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 4);
}
{
// Use prefered partition, but sourceId not in partition boundary
Expand Down
6 changes: 0 additions & 6 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,6 @@ const TPartitionGraph::Node* TPartitionGraph::GetPartition(ui32 id) const {
std::set<ui32> TPartitionGraph::GetActiveChildren(ui32 id) const {
const auto* p = GetPartition(id);
if (!p) {
Cerr << ">>>>> Unknown partition p=" << id << Endl;
Cerr << ">>>>> Known: ";
for(const auto& [k,_] : Partitions) {
Cerr << k << ", ";
}
Cerr << Endl;
return {};
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/writer/metadata_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr cont
column.set_name("Partition");
column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
}
{
/*{
auto& column = *request.add_columns();
column.set_name("SeqNo");
column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64);
}
}*/
{
auto* partSettings = request.mutable_partitioning_settings();
partSettings->add_partition_by("Hash");
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/persqueue/writer/partition_chooser_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class THashChooser: public IPartitionChooser {
template<class THasher>
TBoundaryChooser<THasher>::TBoundaryChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config)
: TopicName(config.GetPQTabletConfig().GetTopicName()) {
for(const auto& p : config.GetPQTabletConfig().GetAllPartitions()) {
for(const auto& p : config.GetPartitions()) {
if (NKikimrPQ::ETopicPartitionStatus::Active == p.GetStatus()) {
auto toBound = p.HasKeyRange() && p.GetKeyRange().HasToBound() ?
std::optional<TString>(p.GetKeyRange().GetToBound()) : std::nullopt;
Expand Down Expand Up @@ -127,7 +127,7 @@ const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THash
//
template<class THasher>
THashChooser<THasher>::THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config) {
for(const auto& p : config.GetPQTabletConfig().GetAllPartitions()) {
for(const auto& p : config.GetPartitions()) {
if (NKikimrPQ::ETopicPartitionStatus::Active == p.GetStatus()) {
Partitions.emplace_back(TPartitionInfo{p.GetPartitionId(),
p.GetTabletId()});
Expand All @@ -140,6 +140,9 @@ THashChooser<THasher>::THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescri

template<class THasher>
const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::GetPartition(const TString& sourceId) const {
if (Partitions.empty()) {
return nullptr;
}
return &Partitions[Hasher(sourceId, Partitions.size())];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,17 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {
PartitionHelper.Close(ctx);
}

bool NeedTable(const NActors::TActorContext& ctx) {
const auto& pqConfig = AppData(ctx)->PQConfig;
return SourceId && (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass());
}

protected:
void InitTable(const NActors::TActorContext& ctx) {
TThis::Become(&TThis::StateInitTable);
const auto& pqConfig = AppData(ctx)->PQConfig;
if (SourceId && pqConfig.GetTopicsAreFirstClassCitizen() && pqConfig.GetUseSrcIdMetaMappingInFirstClass()) {
DEBUG("InitTable");
TThis::Become(&TThis::StateInitTable);
TableHelper.SendInitTableRequest(ctx);
} else {
StartKqpSession(ctx);
Expand All @@ -90,8 +95,7 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {

protected:
void StartKqpSession(const NActors::TActorContext& ctx) {
const auto& pqConfig = AppData(ctx)->PQConfig;
if (SourceId && (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass())) {
if (NeedTable(ctx)) {
DEBUG("StartKqpSession")
TThis::Become(&TThis::StateCreateKqpSession);
TableHelper.SendCreateSessionRequest(ctx);
Expand Down Expand Up @@ -152,7 +156,7 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {

protected:
void SendUpdateRequests(const TActorContext& ctx) {
if (SourceId) {
if (NeedTable(ctx)) {
TThis::Become(&TThis::StateUpdate);
DEBUG("Update the table");
TableHelper.SendUpdateRequest(Partition->PartitionId, ctx);
Expand Down Expand Up @@ -199,11 +203,11 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {

protected:
void StartGetOwnership(const TActorContext &ctx) {
TThis::Become(&TThis::StateOwnership);
if (!Partition) {
return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx);
}

TThis::Become(&TThis::StateOwnership);
DEBUG("GetOwnership Partition TabletId=" << Partition->TabletId);

PartitionHelper.Open(Partition->TabletId, ctx);
Expand All @@ -230,20 +234,24 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {

OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie();

PartitionHelper.Close(ctx);

OnOwnership(ctx);
}

void HandleOwnership(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) {
auto msg = ev->Get();
if (msg->Status != NKikimrProto::OK) {
if (PartitionHelper.IsPipe(ev->Sender) && msg->Status != NKikimrProto::OK) {
TableHelper.CloseKqpSession(ctx);
ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx);
}
}

void HandleOwnership(TEvTabletPipe::TEvClientDestroyed::TPtr& , const NActors::TActorContext& ctx) {
TableHelper.CloseKqpSession(ctx);
ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx);
void HandleOwnership(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx) {
if (PartitionHelper.IsPipe(ev->Sender)) {
TableHelper.CloseKqpSession(ctx);
ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx);
}
}

virtual void OnOwnership(const TActorContext &ctx) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class TPartitionChooserActor: public TAbstractPartitionChooserActor<TPartitionCh
}

void Bootstrap(const TActorContext& ctx) {
TThis::Initialize(ctx);
TThis::InitTable(ctx);
TThis::Initialize(ctx);
TThis::InitTable(ctx);
}

TActorIdentity SelfId() const {
Expand Down Expand Up @@ -82,21 +82,21 @@ class TPartitionChooserActor: public TAbstractPartitionChooserActor<TPartitionCh
DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << TThis::SourceId);
TThis::Partition = TThis::Chooser->GetPartition(PQRBHelper.PartitionId().value());

PQRBHelper.Close(ctx);

OnPartitionChosen(ctx);
}

void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) {
Y_UNUSED(ev);

if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
if (PQRBHelper.IsPipe(ev->Sender) && ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe connection fail", ctx);
}
}

void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx) {
Y_UNUSED(ev);

TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx);
if(PQRBHelper.IsPipe(ev->Sender)) {
TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx);
}
}

STATEFN(StatePQRB) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class TPartitionHelper {
return OwnerCookie_;
}

bool IsPipe(const TActorId& actorId) const {
return actorId == Pipe;
}

private:
THolder<TEvPersQueue::TEvRequest> MakeRequest(ui32 partitionId, TActorId pipe) {
auto ev = MakeHolder<TEvPersQueue::TEvRequest>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class TPQRBHelper {
}
}

bool IsPipe(const TActorId& actorId) const {
return actorId == Pipe;
}

private:
const ui64 BalancerTabletId;

Expand Down
Loading

0 comments on commit 9471f9c

Please sign in to comment.