Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Jan 10, 2024
1 parent c1d2497 commit 3e7db58
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 25 deletions.
10 changes: 5 additions & 5 deletions ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace NKikimr::NPQ {

IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId);
bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node);
bool IsResearchRequires(const TPartitionGraph::Node* node);

//
// TPartitionSourceManager
Expand Down Expand Up @@ -37,7 +37,7 @@ void TPartitionSourceManager::ScheduleBatch() {

PendingSourceIds = std::move(UnknownSourceIds);

for(const auto* parent : node.value()->HierarhicalParents) {
for(const auto* parent : node->HierarhicalParents) {
PendingCookies.insert(++Cookie);

TActorId actorId = PartitionRequester(parent->Id, parent->TabletId);
Expand Down Expand Up @@ -141,7 +141,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const
}
}

TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const {
const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const {
return Partition.PartitionGraph.GetPartition(Partition.Partition);
}

Expand Down Expand Up @@ -484,8 +484,8 @@ IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId p
return new TSourceIdRequester(parent, partition, tabletId);
}

bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node) {
return node && !node.value()->Parents.empty();
bool IsResearchRequires(const TPartitionGraph::Node* node) {
return node && !node->Parents.empty();
}

NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/partition_sourcemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TPartition;

class TPartitionSourceManager {
private:
using TPartitionNode = std::optional<const TPartitionGraph::Node *>;
using TPartitionNode = TPartitionGraph::Node;

public:
using TPartitionId = ui32;
Expand Down Expand Up @@ -96,7 +96,7 @@ class TPartitionSourceManager {
private:
TPartitionSourceManager& Manager;

TPartitionNode Node;
const TPartitionNode* Node;
TSourceIdWriter SourceIdWriter;
THeartbeatEmitter HeartbeatEmitter;
};
Expand Down Expand Up @@ -125,7 +125,7 @@ class TPartitionSourceManager {
void FinishBatch(const TActorContext& ctx);
bool RequireEnqueue(const TString& sourceId);

TPartitionNode GetPartitionNode() const;
const TPartitionNode* GetPartitionNode() const;
TSourceIdStorage& GetSourceIdStorage() const;
bool HasParents() const;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
r->SetOwnerCookie(cookie);
r->SetStatus(PartitionConfig->GetStatus());
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
r->SetRegistered(registered);

ctx.Send(Tablet, response.Release());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
TIMEOUT(3000)
ELSE()
SIZE(MEDIUM)
TIMEOUT(60)
TIMEOUT(600)
ENDIF()

PEERDIR(
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/persqueue/writer/partition_chooser_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ IActor* CreatePartitionChooserActor(TActorId parentId,
std::optional<ui32> preferedPartition,
bool withoutHash) {
auto chooser = CreatePartitionChooser(config, withoutHash);
return new NPartitionChooser::TPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
if (SplitMergeEnabled(config.GetPQTabletConfig())) {
return new NPartitionChooser::TSMPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
} else {
return new NPartitionChooser::TPartitionChooserActor<NTabletPipe::TPipeHelper>(parentId, config, chooser, fullConverter, sourceId, preferedPartition);
}
}

} // namespace NKikimr::NPQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class TTableHelper {
PartitionId_ = tt.GetOptional().GetUint32();
CreateTime = list.GetStruct(1).GetOptional().GetUint64();
AccessTime = accessTime;
SeqNo_ = list.GetStruct(3).GetOptional().GetUint64();
//SeqNo_ = list.GetStruct(3).GetOptional().GetUint64();
}
}
}
Expand Down
29 changes: 16 additions & 13 deletions ydb/core/persqueue/writer/source_id_encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera
"DECLARE $Hash AS Uint32; "
"DECLARE $Topic AS Utf8; "
"DECLARE $SourceId AS Utf8; "
"SELECT Partition, CreateTime, AccessTime, SeqNo FROM `" << path << "` "
"SELECT Partition, CreateTime, AccessTime FROM `" << path << "` "
//"SELECT Partition, CreateTime, AccessTime, SeqNo FROM `" << path << "` "
"WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;";
case ESourceIdTableGeneration::PartitionMapping:
return TStringBuilder() << "--!syntax_v1\n"
"DECLARE $Hash AS Uint64; "
"DECLARE $Topic AS Utf8; "
"DECLARE $SourceId AS Utf8; "
"SELECT Partition, CreateTime, AccessTime, SeqNo FROM `"
"SELECT Partition, CreateTime, AccessTime FROM `"
//"SELECT Partition, CreateTime, AccessTime, SeqNo FROM `"
<< NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
<< "` WHERE Hash == $Hash AND Topic == $Topic AND ProducerId == $SourceId;";
default:
Expand Down Expand Up @@ -59,10 +61,12 @@ TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera
"DECLARE $Hash AS Uint32; "
"DECLARE $Partition AS Uint32; "
"DECLARE $CreateTime AS Uint64; "
"DECLARE $AccessTime AS Uint64; "
"DECLARE $SeqNo AS Uint64;\n"
"UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
"($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);";
"DECLARE $AccessTime AS Uint64;\n"
"UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES "
"($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);";
//"DECLARE $SeqNo AS Uint64;\n"
//"UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
// "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);";
case ESourceIdTableGeneration::PartitionMapping:
return TStringBuilder() << "--!syntax_v1\n"
"DECLARE $SourceId AS Utf8; "
Expand All @@ -71,10 +75,13 @@ TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera
"DECLARE $Partition AS Uint32; "
"DECLARE $CreateTime AS Uint64; "
"DECLARE $AccessTime AS Uint64; "
"DECLARE $SeqNo AS Uint64;\n"
"UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
<< "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
"($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);";
<< "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES "
"($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);";
//"DECLARE $SeqNo AS Uint64;\n"
//"UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath()
// << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
// "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);";
default:
Y_ABORT();
}
Expand All @@ -88,9 +95,7 @@ TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGene
"DECLARE $Topic AS Utf8; "
"DECLARE $Hash AS Uint32; "
"DECLARE $Partition AS Uint32; "
"DECLARE $CreateTime AS Uint64; "
"DECLARE $AccessTime AS Uint64; "
"DECLARE $SeqNo AS Uint64;\n"
"UPDATE `" << path << "` "
"SET AccessTime = $AccessTime "
"WHERE Hash = $Hash AND Topic = $Topic AND Partition = $Partition AND SourceId = $SourceId;";
Expand All @@ -100,9 +105,7 @@ TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGene
"DECLARE $Topic AS Utf8; "
"DECLARE $Hash AS Uint64; "
"DECLARE $Partition AS Uint32; "
"DECLARE $CreateTime AS Uint64; "
"DECLARE $AccessTime AS Uint64; "
"DECLARE $SeqNo AS Uint64;\n"
"UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() << "` "
"SET AccessTime = $AccessTime "
"WHERE Hash = $Hash AND Topic = $Topic AND ProducerId = $SourceId AND Partition = $Partition;";
Expand Down

0 comments on commit 3e7db58

Please sign in to comment.