Skip to content

Commit

Permalink
intermediate
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Jan 10, 2024
1 parent aeca961 commit c1d2497
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 281 deletions.
86 changes: 64 additions & 22 deletions ydb/core/persqueue/ut/partition_chooser_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,65 @@ NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled)
config->SetTopicName("/Root/topic-1");
config->SetTopicPath("/Root");

auto* p0 = result.AddPartitions();
p0->SetPartitionId(0);
p0->SetTabletId(1000);
p0->MutableKeyRange()->SetToBound("C");

auto* p1 = result.AddPartitions();
p1->SetPartitionId(1);
p1->SetTabletId(1001);
p1->MutableKeyRange()->SetFromBound("C");
p1->MutableKeyRange()->SetToBound("F");

auto* p2 = result.AddPartitions();
p2->SetPartitionId(2);
p2->SetTabletId(1002);
p2->MutableKeyRange()->SetFromBound("F");

auto* p3 = result.AddPartitions();
p3->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Inactive);
p3->SetPartitionId(3);
p3->SetTabletId(1003);
p3->MutableKeyRange()->SetFromBound("D");
{
auto* p = result.AddPartitions();
p->SetPartitionId(0);
p->SetTabletId(1000);
p->MutableKeyRange()->SetToBound("C");
}

{
auto* p = result.MutablePQTabletConfig()->AddAllPartitions();
p->SetPartitionId(0);
p->SetTabletId(1000);
p->MutableKeyRange()->SetToBound("C");
}

{
auto* p = result.AddPartitions();
p->SetPartitionId(1);
p->SetTabletId(1001);
p->MutableKeyRange()->SetFromBound("C");
p->MutableKeyRange()->SetToBound("F");
}

{
auto* p = result.MutablePQTabletConfig()->AddAllPartitions();
p->SetPartitionId(1);
p->SetTabletId(1001);
p->MutableKeyRange()->SetFromBound("C");
p->MutableKeyRange()->SetToBound("F");
}

{
auto* p = result.AddPartitions();
p->SetPartitionId(2);
p->SetTabletId(1002);
p->MutableKeyRange()->SetFromBound("F");
}

{
auto* p = result.MutablePQTabletConfig()->AddAllPartitions();
p->SetPartitionId(2);
p->SetTabletId(1002);
p->MutableKeyRange()->SetFromBound("F");
}

{
auto* p = result.AddPartitions();
p->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Inactive);
p->SetPartitionId(3);
p->SetTabletId(1003);
p->MutableKeyRange()->SetFromBound("D");
}

{
auto* p = result.MutablePQTabletConfig()->AddAllPartitions();
p->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Inactive);
p->SetPartitionId(3);
p->SetTabletId(1003);
p->MutableKeyRange()->SetFromBound("D");
}

return result;
}
Expand Down Expand Up @@ -269,9 +307,12 @@ TPQTabletMock* CreatePQTabletMock(NPersQueue::TTestServer& server, ui64 tabletId

Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) {
NPersQueue::TTestServer server{};
server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true);
server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true);
server.EnableLogs({NKikimrServices::PQ_PARTITION_CHOOSER}, NActors::NLog::PRI_TRACE);

CreatePQTabletMock(server, 1000, ETopicPartitionStatus::Active);
CreatePQTabletMock(server, 1001, ETopicPartitionStatus::Active);
CreatePQTabletMock(server, 1002, ETopicPartitionStatus::Active);

{
Expand Down Expand Up @@ -307,7 +348,8 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) {
{
// Use prefered partition, but sourceId not in partition boundary
auto r = ChoosePartition(server, SMEnabled, "A_Source_1", 1);
UNIT_ASSERT(r->Error);
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
}
}

Expand Down
13 changes: 12 additions & 1 deletion ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,19 @@ const TPartitionGraph::Node* TPartitionGraph::GetPartition(ui32 id) const {
}

std::set<ui32> TPartitionGraph::GetActiveChildren(ui32 id) const {
const auto* p = GetPartition(id);
if (!p) {
Cerr << ">>>>> Unknown partition p=" << id << Endl;
Cerr << ">>>>> Known: ";
for(const auto& [k,_] : Partitions) {
Cerr << k << ", ";
}
Cerr << Endl;
return {};
}

std::deque<const Node*> queue;
queue.push_back(GetPartition(id));
queue.push_back(p);

std::set<ui32> result;
while(!queue.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/persqueue/writer/metadata_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr cont
column.set_name("Partition");
column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
}
{
auto& column = *request.add_columns();
column.set_name("SeqNo");
column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64);
}
{
auto* partSettings = request.mutable_partitioning_settings();
partSettings->add_partition_by("Hash");
Expand Down
231 changes: 0 additions & 231 deletions ydb/core/persqueue/writer/partition_chooser_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,237 +32,6 @@ TString TMd5Converter::operator()(const TString& sourceId) const {
return AsKeyBound(Hash(sourceId));
}


#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR)
#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR"
#endif


#define LOG_PREFIX "TTableHelper "
#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);
#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message);

TTableHelper::TTableHelper(const TString& topicName, const TString& topicHashName)
: TopicName(topicName)
, TopicHashName(topicHashName) {
}

std::optional<ui32> TTableHelper::PartitionId() const {
return PartitionId_;
}

bool TTableHelper::Initialize(const TActorContext& ctx, const TString& sourceId) {
const auto& pqConfig = AppData(ctx)->PQConfig;

TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping
: ESourceIdTableGeneration::SrcIdMeta2;
try {
EncodedSourceId = NSourceIdEncoding::EncodeSrcId(
TopicHashName, sourceId, TableGeneration
);
} catch (yexception& e) {
return false;
}

SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration);
UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration);

DEBUG("SelectQuery: " << SelectQuery);
DEBUG("UpdateQuery: " << UpdateQuery);

return true;
}

TString TTableHelper::GetDatabaseName(const NActors::TActorContext& ctx) {
const auto& pqConfig = AppData(ctx)->PQConfig;
switch (TableGeneration) {
case ESourceIdTableGeneration::SrcIdMeta2:
return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig);
case ESourceIdTableGeneration::PartitionMapping:
return AppData(ctx)->TenantName;
}
}

void TTableHelper::SendInitTableRequest(const NActors::TActorContext& ctx) {
ctx.Send(
NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()),
new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant())
);
}

void TTableHelper::SendCreateSessionRequest(const NActors::TActorContext& ctx) {
auto ev = MakeCreateSessionRequest(ctx);
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
}

THolder<NKqp::TEvKqp::TEvCreateSessionRequest> TTableHelper::MakeCreateSessionRequest(const NActors::TActorContext& ctx) {
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
return ev;
}

bool TTableHelper::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext&) {
const auto& record = ev->Get()->Record;

if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
return false;
}

KqpSessionId = record.GetResponse().GetSessionId();
Y_ABORT_UNLESS(!KqpSessionId.empty());

return true;
}

void TTableHelper::CloseKqpSession(const TActorContext& ctx) {
if (KqpSessionId) {
auto ev = MakeCloseSessionRequest();
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());

KqpSessionId = "";
}
}

THolder<NKqp::TEvKqp::TEvCloseSessionRequest> TTableHelper::MakeCloseSessionRequest() {
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
return ev;
}

void TTableHelper::SendSelectRequest(const NActors::TActorContext& ctx) {
auto ev = MakeSelectQueryRequest(ctx);
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
}

THolder<NKqp::TEvKqp::TEvQueryRequest> TTableHelper::MakeSelectQueryRequest(const NActors::TActorContext& ctx) {
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();

ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
ev->Record.MutableRequest()->SetQuery(SelectQuery);

ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
// fill tx settings: set commit tx flag& begin new serializable tx.
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false);
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);

NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder();

SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId);

paramsBuilder
.AddParam("$Topic")
.Utf8(TopicName)
.Build()
.AddParam("$SourceId")
.Utf8(EncodedSourceId.EscapedSourceId)
.Build();

NYdb::TParams params = paramsBuilder.Build();

ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));

return ev;
}

bool TTableHelper::HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
auto& record = ev->Get()->Record.GetRef();

if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
return false;
}

auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);

TxId = record.GetResponse().GetTxMeta().id();
Y_ABORT_UNLESS(!TxId.empty());

if (t.ListSize() != 0) {
auto& tt = t.GetList(0).GetStruct(0);
if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition
auto accessTime = t.GetList(0).GetStruct(2).GetOptional().GetUint64();
if (accessTime > AccessTime) { // AccessTime
PartitionId_ = tt.GetOptional().GetUint32();
CreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64();
AccessTime = accessTime;
}
}
}

if (CreateTime == 0) {
CreateTime = TInstant::Now().MilliSeconds();
}

return true;
}


void TTableHelper::SendUpdateRequest(ui32 partitionId, const TActorContext& ctx) {
auto ev = MakeUpdateQueryRequest(partitionId, ctx);
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
}

THolder<NKqp::TEvKqp::TEvQueryRequest> TTableHelper::MakeUpdateQueryRequest(ui32 partitionId, const NActors::TActorContext& ctx) {
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();

ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
ev->Record.MutableRequest()->SetQuery(UpdateQuery);
ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
// fill tx settings: set commit tx flag& begin new serializable tx.
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
if (KqpSessionId) {
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
}
if (TxId) {
ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId);
TxId = "";
} else {
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
}
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);

NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder();

SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId);

paramsBuilder
.AddParam("$Topic")
.Utf8(TopicName)
.Build()
.AddParam("$SourceId")
.Utf8(EncodedSourceId.EscapedSourceId)
.Build()
.AddParam("$CreateTime")
.Uint64(CreateTime)
.Build()
.AddParam("$AccessTime")
.Uint64(TInstant::Now().MilliSeconds())
.Build()
.AddParam("$Partition")
.Uint32(partitionId)
.Build();

NYdb::TParams params = paramsBuilder.Build();

ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));

return ev;
}

#undef LOG_PREFIX
#undef TRACE
#undef DEBUG
#undef INFO
#undef ERROR


} // namespace NPartitionChooser


Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/writer/partition_chooser_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class THashChooser: public IPartitionChooser {
template<class THasher>
TBoundaryChooser<THasher>::TBoundaryChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config)
: TopicName(config.GetPQTabletConfig().GetTopicName()) {
for(const auto& p : config.GetPartitions()) {
for(const auto& p : config.GetPQTabletConfig().GetAllPartitions()) {
if (NKikimrPQ::ETopicPartitionStatus::Active == p.GetStatus()) {
auto toBound = p.HasKeyRange() && p.GetKeyRange().HasToBound() ?
std::optional<TString>(p.GetKeyRange().GetToBound()) : std::nullopt;
Expand Down Expand Up @@ -127,7 +127,7 @@ const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THash
//
template<class THasher>
THashChooser<THasher>::THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config) {
for(const auto& p : config.GetPartitions()) {
for(const auto& p : config.GetPQTabletConfig().GetAllPartitions()) {
if (NKikimrPQ::ETopicPartitionStatus::Active == p.GetStatus()) {
Partitions.emplace_back(TPartitionInfo{p.GetPartitionId(),
p.GetTabletId()});
Expand Down
Loading

0 comments on commit c1d2497

Please sign in to comment.