Skip to content

Commit

Permalink
intermediate
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Jan 10, 2024
1 parent a4220ad commit aeca961
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 56 deletions.
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {

bool TPartitionSourceManager::HasParents() const {
auto node = Partition.PartitionGraph.GetPartition(Partition.Partition);
return node && !node.value()->Parents.empty();
return node && !node->Parents.empty();
}

TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
// Old configuration format without AllPartitions. Split/Merge is not supported.
continue;
}
if (node.value()->Children.empty()) {
for (const auto* r : node.value()->Parents) {
if (node->Children.empty()) {
for (const auto* r : node->Parents) {
if (extractTabletId != r->TabletId) {
Senders.insert(r->TabletId);
}
}
}

for (const auto* r : node.value()->Children) {
for (const auto* r : node->Children) {
if (r->Children.empty()) {
if (extractTabletId != r->TabletId) {
Receivers.insert(r->TabletId);
Expand Down
68 changes: 30 additions & 38 deletions ydb/core/persqueue/ut/partitiongraph_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,43 +46,35 @@ Y_UNIT_TEST_SUITE(TPartitionGraphTest) {
TPartitionGraph graph;
graph.Rebuild(config);

const auto n0o = graph.GetPartition(0);
const auto n1o = graph.GetPartition(1);
const auto n2o = graph.GetPartition(2);
const auto n3o = graph.GetPartition(3);
const auto n4o = graph.GetPartition(4);
const auto n5o = graph.GetPartition(5);

UNIT_ASSERT(n0o);
UNIT_ASSERT(n1o);
UNIT_ASSERT(n2o);
UNIT_ASSERT(n3o);
UNIT_ASSERT(n4o);
UNIT_ASSERT(n5o);

auto& n0 = *n0o.value();
auto& n1 = *n1o.value();
auto& n2 = *n2o.value();
auto& n3 = *n3o.value();
auto& n4 = *n4o.value();
auto& n5 = *n5o.value();


UNIT_ASSERT_EQUAL(n0.Parents.size(), 0);
UNIT_ASSERT_EQUAL(n0.Children.size(), 0);
UNIT_ASSERT_EQUAL(n0.HierarhicalParents.size(), 0);

UNIT_ASSERT_EQUAL(n1.Parents.size(), 0);
UNIT_ASSERT_EQUAL(n1.Children.size(), 1);
UNIT_ASSERT_EQUAL(n1.HierarhicalParents.size(), 0);

UNIT_ASSERT_EQUAL_C(n5.Parents.size(), 2, "n5.Parents.size() == " << n5.Parents.size() << " but expected 2");
UNIT_ASSERT_EQUAL_C(n5.Children.size(), 0, "n5.Children.size() == " << n5.Children.size() << " but expected 0");
UNIT_ASSERT_EQUAL_C(n5.HierarhicalParents.size(), 4, "n5.HierarhicalParents.size() == " << n5.HierarhicalParents.size() << " but expected 4");
UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n0) == n5.HierarhicalParents.end());
UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n1) != n5.HierarhicalParents.end());
UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n2) != n5.HierarhicalParents.end());
UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n3) != n5.HierarhicalParents.end());
UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n4) != n5.HierarhicalParents.end());
const auto n0 = graph.GetPartition(0);
const auto n1 = graph.GetPartition(1);
const auto n2 = graph.GetPartition(2);
const auto n3 = graph.GetPartition(3);
const auto n4 = graph.GetPartition(4);
const auto n5 = graph.GetPartition(5);

UNIT_ASSERT(n0);
UNIT_ASSERT(n1);
UNIT_ASSERT(n2);
UNIT_ASSERT(n3);
UNIT_ASSERT(n4);
UNIT_ASSERT(n5);

UNIT_ASSERT_EQUAL(n0->Parents.size(), 0);
UNIT_ASSERT_EQUAL(n0->Children.size(), 0);
UNIT_ASSERT_EQUAL(n0->HierarhicalParents.size(), 0);

UNIT_ASSERT_EQUAL(n1->Parents.size(), 0);
UNIT_ASSERT_EQUAL(n1->Children.size(), 1);
UNIT_ASSERT_EQUAL(n1->HierarhicalParents.size(), 0);

UNIT_ASSERT_EQUAL_C(n5->Parents.size(), 2, "n5.Parents.size() == " << n5->Parents.size() << " but expected 2");
UNIT_ASSERT_EQUAL_C(n5->Children.size(), 0, "n5.Children.size() == " << n5->Children.size() << " but expected 0");
UNIT_ASSERT_EQUAL_C(n5->HierarhicalParents.size(), 4, "n5.HierarhicalParents.size() == " << n5->HierarhicalParents.size() << " but expected 4");
UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n0) == n5->HierarhicalParents.end());
UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n1) != n5->HierarhicalParents.end());
UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n2) != n5->HierarhicalParents.end());
UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n3) != n5->HierarhicalParents.end());
UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n4) != n5->HierarhicalParents.end());
}
}
32 changes: 29 additions & 3 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ
return nullptr;
}

TPartitionGraph::TPartitionGraph() {
}

TPartitionGraph::TPartitionGraph(const NKikimrPQ::TPQTabletConfig& config) {
Rebuild(config);
}

void TPartitionGraph::Rebuild(const NKikimrPQ::TPQTabletConfig& config) {
Partitions.clear();

Expand Down Expand Up @@ -106,12 +113,31 @@ void TPartitionGraph::Rebuild(const NKikimrPQ::TPQTabletConfig& config) {
}
}

std::optional<const TPartitionGraph::Node*> TPartitionGraph::GetPartition(ui32 id) const {
const TPartitionGraph::Node* TPartitionGraph::GetPartition(ui32 id) const {
auto it = Partitions.find(id);
if (it == Partitions.end()) {
return std::nullopt;
return nullptr;
}
return &it->second;
}

std::set<ui32> TPartitionGraph::GetActiveChildren(ui32 id) const {
std::deque<const Node*> queue;
queue.push_back(GetPartition(id));

std::set<ui32> result;
while(!queue.empty()) {
const auto* n = queue.front();
queue.pop_front();

if (n->Children.empty()) {
result.emplace(n->Id);
} else {
queue.insert(queue.end(), n->Children.begin(), n->Children.end());
}
}
return std::optional(&it->second);

return result;
}

TPartitionGraph::Node::Node(const NKikimrPQ::TPQTabletConfig::TPartition& config) {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/persqueue/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ class TPartitionGraph {
std::set<Node*> HierarhicalParents;
};

TPartitionGraph();
TPartitionGraph(const NKikimrPQ::TPQTabletConfig& config);

void Rebuild(const NKikimrPQ::TPQTabletConfig& config);

std::optional<const Node*> GetPartition(ui32 id) const;
const Node* GetPartition(ui32 id) const;
std::set<ui32> GetActiveChildren(ui32 id) const;
private:
std::unordered_map<ui32, Node> Partitions;
};
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/persqueue/writer/partition_chooser.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ struct TEvPartitionChooser {
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER)");

struct TEvChooseResult: public TEventLocal<TEvChooseResult, EvChooseResult> {
TEvChooseResult(ui32 partitionId, ui64 tabletId, const TString& ownerCookie)
TEvChooseResult(ui32 partitionId, ui64 tabletId, const TString& ownerCookie, std::optional<ui64> seqNo)
: PartitionId(partitionId)
, TabletId(tabletId)
, OwnerCookie(ownerCookie) {
, OwnerCookie(ownerCookie)
, SeqNo(seqNo) {
}

ui32 PartitionId;
ui64 TabletId;
TString OwnerCookie;
std::optional<ui64> SeqNo;
};

struct TEvChooseError: public TEventLocal<TEvChooseError, EvChooseError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,18 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {
OnOwnership(ctx);
}

void HandleOwnership(TEvTabletPipe::TEvClientConnected::TPtr& , const NActors::TActorContext& ) {}
void HandleOwnership(TEvTabletPipe::TEvClientDestroyed::TPtr& , const NActors::TActorContext& ) {}
void HandleOwnership(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) {
auto msg = ev->Get();
if (msg->Status != NKikimrProto::OK) {
TableHelper.CloseKqpSession(ctx);
ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx);
}
}

void HandleOwnership(TEvTabletPipe::TEvClientDestroyed::TPtr& , const NActors::TActorContext& ctx) {
TableHelper.CloseKqpSession(ctx);
ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx);
}

virtual void OnOwnership(const TActorContext &ctx) = 0;

Expand Down Expand Up @@ -285,7 +295,7 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {

protected:
void ReplyResult(const NActors::TActorContext& ctx) {
ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, TThis::OwnerCookie));
ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, TThis::OwnerCookie, SeqNo));
}

void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) {
Expand All @@ -310,6 +320,7 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {
bool PartitionPersisted = false;

TString OwnerCookie;
std::optional<ui64> SeqNo = 0;
};

#undef LOG_PREFIX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "common.h"
#include "pipe_utils.h"
#include "source_id_encoding.h"

#include <ydb/core/persqueue/events/global.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
Expand Down Expand Up @@ -35,6 +36,15 @@ class TPartitionHelper {
NTabletPipe::SendData(ctx, Pipe, ev.Release());
}

void SendMaxSeqNoRequest(ui32 partitionId, const TString& sourceId, const TActorContext& ctx) {
auto ev = MakeRequest(partitionId, Pipe);

auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetMaxSeqNo();
cmd.AddSourceId(NSourceIdEncoding::EncodeSimple(sourceId));

NTabletPipe::SendData(ctx, Pipe, ev.Release());
}

void Close(const TActorContext& ctx) {
if (Pipe) {
NTabletPipe::CloseClient(ctx, Pipe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "partition_chooser_impl__abstract_chooser_actor.h"

#include <ydb/core/persqueue/utils.h>

namespace NKikimr::NPQ::NPartitionChooser {

#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR)
Expand All @@ -15,7 +17,7 @@ namespace NKikimr::NPQ::NPartitionChooser {
<< ") "
#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
#define INFO(message) LOG_INFO_S (*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);

template<typename TPipeCreator>
Expand All @@ -26,12 +28,14 @@ class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartiti
using TParentActor = TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>;

TSMPartitionChooserActor(TActorId parentId,
const NKikimrSchemeOp::TPersQueueGroupDescription& /*config*/,
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
std::shared_ptr<IPartitionChooser>& chooser,
NPersQueue::TTopicConverterPtr& fullConverter,
const TString& sourceId,
std::optional<ui32> preferedPartition)
: TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition) {
: TAbstractPartitionChooserActor<TSMPartitionChooserActor<TPipeCreator>, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition)
, Graph(config.GetPQTabletConfig()) {

}

void Bootstrap(const TActorContext& ctx) {
Expand All @@ -46,18 +50,37 @@ class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartiti
}

void OnSelected(const TActorContext &ctx) override {
OldPartition = TThis::Partition;

if (BoundaryPartition == TThis::Partition) {
return OnPartitionChosen(ctx);
}

if (!TThis::TableHelper.PartitionId()) {
if (!TThis::TableHelper.PartitionId() || !TThis::Partition) {
TThis::Partition = BoundaryPartition;
return OnPartitionChosen(ctx);
}

if (!TThis::Partition) {
auto activeChildren = Graph.GetActiveChildren(TThis::TableHelper.PartitionId().value());
if (activeChildren.empty()) {
return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "has't active partition Marker# PC01", ctx);
}
if (activeChildren.contains(BoundaryPartition->PartitionId)) {
TThis::Partition = BoundaryPartition;
} else {
auto n = RandomNumber<size_t>(activeChildren.size());
std::vector<ui32> ac;
ac.reserve(activeChildren.size());
ac.insert(ac.end(), activeChildren.begin(), activeChildren.end());
auto id = ac[n];
TThis::Partition = TThis::Chooser->GetPartition(id);
}

if (!TThis::Partition) {
return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "can't choose partition Marker# PC02", ctx);
}

GetOldSeqNo(ctx);
}

void OnOwnership(const TActorContext &ctx) override {
Expand All @@ -66,6 +89,59 @@ class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartiti
}

private:
void GetOldSeqNo(const TActorContext &ctx) {
DEBUG("GetOldSeqNo");
TThis::Become(&TThis::StateGetMaxSeqNo);

if (!OldPartition) {
return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "Inconsistent status Marker# PC03", ctx);
}

TThis::PartitionHelper.Open(OldPartition->TabletId, ctx);
TThis::PartitionHelper.SendMaxSeqNoRequest(OldPartition->PartitionId, TThis::SourceId, ctx);
}

void HandleMaxSeqNo(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

TString error;
if (!BasicCheck(record, error)) {
return TThis::ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx);
}

const auto& response = record.GetPartitionResponse();
if (!response.HasCmdGetMaxSeqNoResult()) {
return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent MaxSeqNo result", ctx);
}

const auto& result = response.GetCmdGetMaxSeqNoResult();
if (result.SourceIdInfoSize() < 1) {
return TThis::ReplyError(ErrorCode::INITIALIZING, "Empty source id info", ctx);
}

const auto& sourceIdInfo = result.GetSourceIdInfo(0);
switch (sourceIdInfo.GetState()) {
case NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED:
TThis::SeqNo = sourceIdInfo.GetSeqNo();
break;
case NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION:
case NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN:
TThis::SeqNo = 0; // TODO from table
break;
}

TThis::PartitionHelper.Close(ctx);
TThis::StartGetOwnership(ctx);
}

STATEFN(StateGetMaxSeqNo) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvPersQueue::TEvResponse, HandleMaxSeqNo);
SFunc(TEvents::TEvPoison, TThis::Die);
HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership);
HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership);
}
}


private:
Expand Down Expand Up @@ -108,6 +184,8 @@ class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartiti

private:
const TPartitionInfo* BoundaryPartition = nullptr;
const TPartitionInfo* OldPartition = nullptr;
const TPartitionGraph Graph;
};

#undef LOG_PREFIX
Expand Down

0 comments on commit aeca961

Please sign in to comment.