From 14def10cb4bffa6d021d702af003c59b12928ba0 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Mon, 18 Dec 2023 12:11:25 +0300 Subject: [PATCH] Optimize heartbeats emission KIKIMR-20392 --- ydb/core/persqueue/partition.cpp | 1 + ydb/core/persqueue/partition.h | 1 + .../persqueue/partition_sourcemanager.cpp | 11 +- ydb/core/persqueue/partition_sourcemanager.h | 4 +- ydb/core/persqueue/partition_write.cpp | 71 ++++----- ydb/core/persqueue/sourceid.cpp | 97 +++++++++---- ydb/core/persqueue/sourceid.h | 19 ++- ydb/core/persqueue/ut/sourceid_ut.cpp | 137 ++++++++++++++++++ 8 files changed, 262 insertions(+), 79 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 7d814dd3f26a..64aebab88e6b 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -187,6 +187,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui , NumChannels(numChannels) , WriteBufferIsFullCounter(nullptr) , WriteLagMs(TDuration::Minutes(1), 100) + , LastEmittedHeartbeat(TRowVersion::Min()) { TabletCounters.Populate(Counters); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 914e330e7cd3..dd0fa7e36eae 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -755,6 +755,7 @@ class TPartition : public TActorBootstrapped { TInstant LastUsedStorageMeterTimestamp; TDeque> PendingEvents; + TRowVersion LastEmittedHeartbeat; }; } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index d6aabef3e89d..0e8f6d781274 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -236,7 +236,7 @@ TPartitionSourceManager::TModificationBatch::~TModificationBatch() { } } -TMaybe TPartitionSourceManager::TModificationBatch::CanEmit() const { +TMaybe TPartitionSourceManager::TModificationBatch::CanEmitHeartbeat() const { return HeartbeatEmitter.CanEmit(); } @@ -331,13 +331,8 @@ void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TI } } -void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) { - Batch.HeartbeatEmitter.Process(SourceId, heartbeat); - if (InMemory == MemoryStorage().end()) { - Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, std::move(heartbeat)); - } else { - Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat))); - } +void TPartitionSourceManager::TSourceManager::Update(THeartbeat&& heartbeat) { + Batch.HeartbeatEmitter.Process(SourceId, std::move(heartbeat)); } TPartitionSourceManager::TSourceManager::operator bool() const { diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h index 4899e1598af2..a05a51aa9700 100644 --- a/ydb/core/persqueue/partition_sourcemanager.h +++ b/ydb/core/persqueue/partition_sourcemanager.h @@ -50,7 +50,7 @@ class TPartitionSourceManager { std::optional UpdatedSeqNo() const; void Update(ui64 seqNo, ui64 offset, TInstant timestamp); - void Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat); + void Update(THeartbeat&& heartbeat); operator bool() const; @@ -77,7 +77,7 @@ class TPartitionSourceManager { TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format); ~TModificationBatch(); - TMaybe CanEmit() const; + TMaybe CanEmitHeartbeat() const; TSourceManager GetSource(const TString& id); void Cancel(); diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 204d96b38c94..164e21643b99 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -910,13 +910,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame << " version " << *hbVersion ); - auto heartbeat = THeartbeat{ - .Version = *hbVersion, - .Data = p.Msg.Data, - }; - - sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat)); - + sourceId.Update(THeartbeat{*hbVersion, p.Msg.Data}); return ProcessResult::Continue; } @@ -1188,6 +1182,41 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const } } + if (const auto heartbeat = sourceIdBatch.CanEmitHeartbeat()) { + if (heartbeat->Version > LastEmittedHeartbeat) { + LOG_INFO_S( + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicName() << "' partition " << Partition + << " emit heartbeat " << heartbeat->Version + ); + + auto hbMsg = TWriteMsg{Max() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{ + .SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)), + .SeqNo = 0, // we don't use SeqNo because we disable deduplication + .PartNo = 0, + .TotalParts = 1, + .TotalSize = static_cast(heartbeat->Data.size()), + .CreateTimestamp = CurrentTimestamp.MilliSeconds(), + .ReceiveTimestamp = CurrentTimestamp.MilliSeconds(), + .DisableDeduplication = true, + .WriteTimestamp = CurrentTimestamp.MilliSeconds(), + .Data = heartbeat->Data, + .UncompressedSize = 0, + .PartitionKey = {}, + .ExplicitHashKey = {}, + .External = false, + .IgnoreQuotaDeadline = true, + .HeartbeatVersion = std::nullopt, + }}; + + WriteInflightSize += heartbeat->Data.size(); + auto result = ProcessRequest(hbMsg, parameters, request, ctx); + Y_ABORT_UNLESS(result == ProcessResult::Continue); + + LastEmittedHeartbeat = heartbeat->Version; + } + } + UpdateWriteBufferIsFullState(ctx.Now()); if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) { @@ -1385,34 +1414,6 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c } } - if (const auto heartbeat = sourceIdBatch.CanEmit()) { - LOG_INFO_S( - ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicName() << "' partition " << Partition - << " emit heartbeat " << heartbeat->Version - ); - - EmplaceRequest(TWriteMsg{Max() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{ - .SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)), - .SeqNo = 0, // we don't use SeqNo because we disable deduplication - .PartNo = 0, - .TotalParts = 1, - .TotalSize = static_cast(heartbeat->Data.size()), - .CreateTimestamp = CurrentTimestamp.MilliSeconds(), - .ReceiveTimestamp = CurrentTimestamp.MilliSeconds(), - .DisableDeduplication = true, - .WriteTimestamp = CurrentTimestamp.MilliSeconds(), - .Data = heartbeat->Data, - .UncompressedSize = 0, - .PartitionKey = {}, - .ExplicitHashKey = {}, - .External = false, - .IgnoreQuotaDeadline = true, - .HeartbeatVersion = std::nullopt, - }}, ctx); - WriteInflightSize += heartbeat->Data.size(); - } - if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed if (!sourceIdBatch.HasModifications()) { return request->Record.CmdWriteSize() > 0 diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index 4d124f7eda07..e2176d8c3376 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -79,13 +79,6 @@ void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark, void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) { FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd); } -THeartbeatProcessor::THeartbeatProcessor( - const THashSet& sourceIdsWithHeartbeat, - const TMap>& sourceIdsByHeartbeat) - : SourceIdsWithHeartbeat(sourceIdsWithHeartbeat) - , SourceIdsByHeartbeat(sourceIdsByHeartbeat) -{ -} void THeartbeatProcessor::ApplyHeartbeat(const TString& sourceId, const TRowVersion& version) { SourceIdsWithHeartbeat.insert(sourceId); @@ -501,29 +494,32 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti /// THeartbeatEmitter THeartbeatEmitter::THeartbeatEmitter(const TSourceIdStorage& storage) - : THeartbeatProcessor(storage.SourceIdsWithHeartbeat, storage.SourceIdsByHeartbeat) - , Storage(storage) + : Storage(storage) { } -void THeartbeatEmitter::Process(const TString& sourceId, const THeartbeat& heartbeat) { - Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(sourceId)); - const auto& sourceIdInfo = Storage.InMemorySourceIds.at(sourceId); +void THeartbeatEmitter::Process(const TString& sourceId, THeartbeat&& heartbeat) { + auto it = Storage.InMemorySourceIds.find(sourceId); + if (it != Storage.InMemorySourceIds.end() && it->second.LastHeartbeat) { + if (heartbeat.Version <= it->second.LastHeartbeat->Version) { + return; + } + } - if (const auto& lastHeartbeat = sourceIdInfo.LastHeartbeat) { - ForgetHeartbeat(sourceId, lastHeartbeat->Version); + if (!Storage.SourceIdsWithHeartbeat.contains(sourceId)) { + NewSourceIdsWithHeartbeat.insert(sourceId); } - if (LastHeartbeats.contains(sourceId)) { - ForgetHeartbeat(sourceId, LastHeartbeats.at(sourceId).Version); + if (Heartbeats.contains(sourceId)) { + ForgetHeartbeat(sourceId, Heartbeats.at(sourceId).Version); } ApplyHeartbeat(sourceId, heartbeat.Version); - LastHeartbeats[sourceId] = heartbeat; + Heartbeats[sourceId] = std::move(heartbeat); } TMaybe THeartbeatEmitter::CanEmit() const { - if (SourceIdsWithHeartbeat.size() != Storage.ExplicitSourceIds.size()) { + if (Storage.ExplicitSourceIds.size() != (Storage.SourceIdsWithHeartbeat.size() + NewSourceIdsWithHeartbeat.size())) { return Nothing(); } @@ -531,19 +527,68 @@ TMaybe THeartbeatEmitter::CanEmit() const { return Nothing(); } - auto it = SourceIdsByHeartbeat.begin(); - if (Storage.SourceIdsByHeartbeat.empty() || it->first > Storage.SourceIdsByHeartbeat.begin()->first) { - Y_ABORT_UNLESS(!it->second.empty()); - const auto& someSourceId = *it->second.begin(); + if (!NewSourceIdsWithHeartbeat.empty()) { // just got quorum + if (!Storage.SourceIdsByHeartbeat.empty() && Storage.SourceIdsByHeartbeat.begin()->first < SourceIdsByHeartbeat.begin()->first) { + return GetFromStorage(Storage.SourceIdsByHeartbeat.begin()); + } else { + return GetFromDiff(SourceIdsByHeartbeat.begin()); + } + } else if (SourceIdsByHeartbeat.begin()->first > Storage.SourceIdsByHeartbeat.begin()->first) { + auto storage = Storage.SourceIdsByHeartbeat.begin(); + auto diff = SourceIdsByHeartbeat.begin(); + + TMaybe newVersion; + while (storage != Storage.SourceIdsByHeartbeat.end()) { + const auto& [version, sourceIds] = *storage; + + auto rest = sourceIds.size(); + for (const auto& sourceId : sourceIds) { + auto it = Heartbeats.find(sourceId); + if (it != Heartbeats.end() && it->second.Version > version && version <= diff->first) { + --rest; + } else { + break; + } + } - if (LastHeartbeats.contains(someSourceId)) { - return LastHeartbeats.at(someSourceId); - } else if (Storage.InMemorySourceIds.contains(someSourceId)) { - return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat; + if (!rest) { + if (++storage != Storage.SourceIdsByHeartbeat.end()) { + newVersion = storage->first; + } else { + newVersion = diff->first; + } + } else { + break; + } + } + + if (newVersion) { + storage = Storage.SourceIdsByHeartbeat.find(*newVersion); + if (storage != Storage.SourceIdsByHeartbeat.end()) { + return GetFromStorage(storage); + } else { + return GetFromDiff(diff); + } } } return Nothing(); } +TMaybe THeartbeatEmitter::GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const { + Y_ABORT_UNLESS(!it->second.empty()); + const auto& someSourceId = *it->second.begin(); + + Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(someSourceId)); + return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat; +} + +TMaybe THeartbeatEmitter::GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const { + Y_ABORT_UNLESS(!it->second.empty()); + const auto& someSourceId = *it->second.begin(); + + Y_ABORT_UNLESS(Heartbeats.contains(someSourceId)); + return Heartbeats.at(someSourceId); +} + } diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 9ddf494f381a..897014363f29 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -59,19 +59,17 @@ struct TSourceIdInfo { }; // TSourceIdInfo class THeartbeatProcessor { -public: - THeartbeatProcessor() = default; - explicit THeartbeatProcessor( - const THashSet& sourceIdsWithHeartbeat, - const TMap>& sourceIdsByHeartbeat); +protected: + using TSourceIdsByHeartbeat = TMap>; +public: void ApplyHeartbeat(const TString& sourceId, const TRowVersion& version); void ForgetHeartbeat(const TString& sourceId, const TRowVersion& version); void ForgetSourceId(const TString& sourceId); protected: THashSet SourceIdsWithHeartbeat; - TMap> SourceIdsByHeartbeat; + TSourceIdsByHeartbeat SourceIdsByHeartbeat; }; // THeartbeatProcessor @@ -151,12 +149,17 @@ class THeartbeatEmitter: private THeartbeatProcessor { public: explicit THeartbeatEmitter(const TSourceIdStorage& storage); - void Process(const TString& sourceId, const THeartbeat& heartbeat); + void Process(const TString& sourceId, THeartbeat&& heartbeat); TMaybe CanEmit() const; +private: + TMaybe GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const; + TMaybe GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const; + private: const TSourceIdStorage& Storage; - THashMap LastHeartbeats; + THashSet NewSourceIdsWithHeartbeat; + THashMap Heartbeats; }; // THeartbeatEmitter diff --git a/ydb/core/persqueue/ut/sourceid_ut.cpp b/ydb/core/persqueue/ut/sourceid_ut.cpp index 07a0a3944bbc..43164c888c50 100644 --- a/ydb/core/persqueue/ut/sourceid_ut.cpp +++ b/ydb/core/persqueue/ut/sourceid_ut.cpp @@ -323,6 +323,143 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } } + inline static TSourceIdInfo MakeExplicitSourceIdInfo(ui64 offset, const TMaybe& heartbeat = Nothing()) { + auto info = TSourceIdInfo(0, offset, TInstant::Now()); + + info.Explicit = true; + if (heartbeat) { + info.LastHeartbeat = heartbeat; + } + + return info; + } + + inline static THeartbeat MakeHeartbeat(ui64 step) { + return THeartbeat{ + .Version = TRowVersion(step, 0), + .Data = "", + }; + } + + Y_UNIT_TEST(HeartbeatEmitter) { + TSourceIdStorage storage; + ui64 offset = 0; + + // initial info w/o heartbeats + for (ui64 i = 1; i <= 2; ++i) { + storage.RegisterSourceId(TestSourceId(i), MakeExplicitSourceIdInfo(++offset)); + } + { + THeartbeatEmitter emitter(storage); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(1), MakeHeartbeat(1)); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(1), MakeHeartbeat(2)); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(2), MakeHeartbeat(1)); + { + const auto heartbeat = emitter.CanEmit(); + UNIT_ASSERT(heartbeat.Defined()); + UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(1).Version); + } + + emitter.Process(TestSourceId(2), MakeHeartbeat(3)); + { + const auto heartbeat = emitter.CanEmit(); + UNIT_ASSERT(heartbeat.Defined()); + UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(2).Version); + } + } + + // one heartbeat + storage.RegisterSourceId(TestSourceId(1), MakeExplicitSourceIdInfo(+offset, MakeHeartbeat(4))); + { + THeartbeatEmitter emitter(storage); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(2), MakeHeartbeat(3)); + { + const auto heartbeat = emitter.CanEmit(); + UNIT_ASSERT(heartbeat.Defined()); + UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(3).Version); + } + } + { + THeartbeatEmitter emitter(storage); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(2), MakeHeartbeat(5)); + { + const auto heartbeat = emitter.CanEmit(); + UNIT_ASSERT(heartbeat.Defined()); + UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(4).Version); + } + } + + // two different heartbeats + storage.RegisterSourceId(TestSourceId(2), MakeExplicitSourceIdInfo(++offset, MakeHeartbeat(5))); + { + THeartbeatEmitter emitter(storage); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(2), MakeHeartbeat(6)); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(1), MakeHeartbeat(5)); + { + const auto heartbeat = emitter.CanEmit(); + UNIT_ASSERT(heartbeat.Defined()); + UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(5).Version); + } + } + { + THeartbeatEmitter emitter(storage); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(2), MakeHeartbeat(6)); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(1), MakeHeartbeat(7)); + { + const auto heartbeat = emitter.CanEmit(); + UNIT_ASSERT(heartbeat.Defined()); + UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(6).Version); + } + } + + // two same heartbeats + storage.RegisterSourceId(TestSourceId(1), MakeExplicitSourceIdInfo(++offset, MakeHeartbeat(5))); + { + THeartbeatEmitter emitter(storage); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(1), MakeHeartbeat(6)); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(2), MakeHeartbeat(6)); + { + const auto heartbeat = emitter.CanEmit(); + UNIT_ASSERT(heartbeat.Defined()); + UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(6).Version); + } + } + + // can't roll back + { + THeartbeatEmitter emitter(storage); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(1), MakeHeartbeat(4)); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + + emitter.Process(TestSourceId(2), MakeHeartbeat(4)); + UNIT_ASSERT(!emitter.CanEmit().Defined()); + } + } + } // TSourceIdTests } // namespace NKikimr::NPQ