Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize heartbeats emission KIKIMR-20392 #557

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui
, NumChannels(numChannels)
, WriteBufferIsFullCounter(nullptr)
, WriteLagMs(TDuration::Minutes(1), 100)
, LastEmittedHeartbeat(TRowVersion::Min())
{
TabletCounters.Populate(Counters);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
TInstant LastUsedStorageMeterTimestamp;

TDeque<std::unique_ptr<IEventBase>> PendingEvents;
TRowVersion LastEmittedHeartbeat;
};

} // namespace NKikimr::NPQ
Expand Down
11 changes: 3 additions & 8 deletions ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ TPartitionSourceManager::TModificationBatch::~TModificationBatch() {
}
}

TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmit() const {
TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmitHeartbeat() const {
return HeartbeatEmitter.CanEmit();
}

Expand Down Expand Up @@ -331,13 +331,8 @@ void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TI
}
}

void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) {
Batch.HeartbeatEmitter.Process(SourceId, heartbeat);
if (InMemory == MemoryStorage().end()) {
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, std::move(heartbeat));
} else {
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat)));
}
void TPartitionSourceManager::TSourceManager::Update(THeartbeat&& heartbeat) {
Batch.HeartbeatEmitter.Process(SourceId, std::move(heartbeat));
}

TPartitionSourceManager::TSourceManager::operator bool() const {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition_sourcemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TPartitionSourceManager {
std::optional<ui64> UpdatedSeqNo() const;

void Update(ui64 seqNo, ui64 offset, TInstant timestamp);
void Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat);
void Update(THeartbeat&& heartbeat);

operator bool() const;

Expand All @@ -77,7 +77,7 @@ class TPartitionSourceManager {
TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format);
~TModificationBatch();

TMaybe<THeartbeat> CanEmit() const;
TMaybe<THeartbeat> CanEmitHeartbeat() const;
TSourceManager GetSource(const TString& id);

void Cancel();
Expand Down
71 changes: 36 additions & 35 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -910,13 +910,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
<< " version " << *hbVersion
);

auto heartbeat = THeartbeat{
.Version = *hbVersion,
.Data = p.Msg.Data,
};

sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat));

sourceId.Update(THeartbeat{*hbVersion, p.Msg.Data});
return ProcessResult::Continue;
}

Expand Down Expand Up @@ -1188,6 +1182,41 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
}
}

if (const auto heartbeat = sourceIdBatch.CanEmitHeartbeat()) {
if (heartbeat->Version > LastEmittedHeartbeat) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"Topic '" << TopicName() << "' partition " << Partition
<< " emit heartbeat " << heartbeat->Version
);

auto hbMsg = TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
.SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
.SeqNo = 0, // we don't use SeqNo because we disable deduplication
.PartNo = 0,
.TotalParts = 1,
.TotalSize = static_cast<ui32>(heartbeat->Data.size()),
.CreateTimestamp = CurrentTimestamp.MilliSeconds(),
.ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
.DisableDeduplication = true,
.WriteTimestamp = CurrentTimestamp.MilliSeconds(),
.Data = heartbeat->Data,
.UncompressedSize = 0,
.PartitionKey = {},
.ExplicitHashKey = {},
.External = false,
.IgnoreQuotaDeadline = true,
.HeartbeatVersion = std::nullopt,
}};

WriteInflightSize += heartbeat->Data.size();
auto result = ProcessRequest(hbMsg, parameters, request, ctx);
Y_ABORT_UNLESS(result == ProcessResult::Continue);

LastEmittedHeartbeat = heartbeat->Version;
}
}

UpdateWriteBufferIsFullState(ctx.Now());

if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) {
Expand Down Expand Up @@ -1385,34 +1414,6 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
}
}

if (const auto heartbeat = sourceIdBatch.CanEmit()) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"Topic '" << TopicName() << "' partition " << Partition
<< " emit heartbeat " << heartbeat->Version
);

EmplaceRequest(TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
.SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
.SeqNo = 0, // we don't use SeqNo because we disable deduplication
.PartNo = 0,
.TotalParts = 1,
.TotalSize = static_cast<ui32>(heartbeat->Data.size()),
.CreateTimestamp = CurrentTimestamp.MilliSeconds(),
.ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
.DisableDeduplication = true,
.WriteTimestamp = CurrentTimestamp.MilliSeconds(),
.Data = heartbeat->Data,
.UncompressedSize = 0,
.PartitionKey = {},
.ExplicitHashKey = {},
.External = false,
.IgnoreQuotaDeadline = true,
.HeartbeatVersion = std::nullopt,
}}, ctx);
WriteInflightSize += heartbeat->Data.size();
}

if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed
if (!sourceIdBatch.HasModifications()) {
return request->Record.CmdWriteSize() > 0
Expand Down
97 changes: 71 additions & 26 deletions ydb/core/persqueue/sourceid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark,
void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd);
}
THeartbeatProcessor::THeartbeatProcessor(
const THashSet<TString>& sourceIdsWithHeartbeat,
const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat)
: SourceIdsWithHeartbeat(sourceIdsWithHeartbeat)
, SourceIdsByHeartbeat(sourceIdsByHeartbeat)
{
}

void THeartbeatProcessor::ApplyHeartbeat(const TString& sourceId, const TRowVersion& version) {
SourceIdsWithHeartbeat.insert(sourceId);
Expand Down Expand Up @@ -501,49 +494,101 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti

/// THeartbeatEmitter
THeartbeatEmitter::THeartbeatEmitter(const TSourceIdStorage& storage)
: THeartbeatProcessor(storage.SourceIdsWithHeartbeat, storage.SourceIdsByHeartbeat)
, Storage(storage)
: Storage(storage)
{
}

void THeartbeatEmitter::Process(const TString& sourceId, const THeartbeat& heartbeat) {
Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(sourceId));
const auto& sourceIdInfo = Storage.InMemorySourceIds.at(sourceId);
void THeartbeatEmitter::Process(const TString& sourceId, THeartbeat&& heartbeat) {
auto it = Storage.InMemorySourceIds.find(sourceId);
if (it != Storage.InMemorySourceIds.end() && it->second.LastHeartbeat) {
if (heartbeat.Version <= it->second.LastHeartbeat->Version) {
return;
}
}

if (const auto& lastHeartbeat = sourceIdInfo.LastHeartbeat) {
ForgetHeartbeat(sourceId, lastHeartbeat->Version);
if (!Storage.SourceIdsWithHeartbeat.contains(sourceId)) {
NewSourceIdsWithHeartbeat.insert(sourceId);
}

if (LastHeartbeats.contains(sourceId)) {
ForgetHeartbeat(sourceId, LastHeartbeats.at(sourceId).Version);
if (Heartbeats.contains(sourceId)) {
ForgetHeartbeat(sourceId, Heartbeats.at(sourceId).Version);
}

ApplyHeartbeat(sourceId, heartbeat.Version);
LastHeartbeats[sourceId] = heartbeat;
Heartbeats[sourceId] = std::move(heartbeat);
}

TMaybe<THeartbeat> THeartbeatEmitter::CanEmit() const {
if (SourceIdsWithHeartbeat.size() != Storage.ExplicitSourceIds.size()) {
if (Storage.ExplicitSourceIds.size() != (Storage.SourceIdsWithHeartbeat.size() + NewSourceIdsWithHeartbeat.size())) {
return Nothing();
}

if (SourceIdsByHeartbeat.empty()) {
return Nothing();
}

auto it = SourceIdsByHeartbeat.begin();
if (Storage.SourceIdsByHeartbeat.empty() || it->first > Storage.SourceIdsByHeartbeat.begin()->first) {
Y_ABORT_UNLESS(!it->second.empty());
const auto& someSourceId = *it->second.begin();
if (!NewSourceIdsWithHeartbeat.empty()) { // just got quorum
if (!Storage.SourceIdsByHeartbeat.empty() && Storage.SourceIdsByHeartbeat.begin()->first < SourceIdsByHeartbeat.begin()->first) {
return GetFromStorage(Storage.SourceIdsByHeartbeat.begin());
} else {
return GetFromDiff(SourceIdsByHeartbeat.begin());
}
} else if (SourceIdsByHeartbeat.begin()->first > Storage.SourceIdsByHeartbeat.begin()->first) {
auto storage = Storage.SourceIdsByHeartbeat.begin();
auto diff = SourceIdsByHeartbeat.begin();

TMaybe<TRowVersion> newVersion;
while (storage != Storage.SourceIdsByHeartbeat.end()) {
const auto& [version, sourceIds] = *storage;

auto rest = sourceIds.size();
for (const auto& sourceId : sourceIds) {
auto it = Heartbeats.find(sourceId);
if (it != Heartbeats.end() && it->second.Version > version && version <= diff->first) {
--rest;
} else {
break;
}
}

if (LastHeartbeats.contains(someSourceId)) {
return LastHeartbeats.at(someSourceId);
} else if (Storage.InMemorySourceIds.contains(someSourceId)) {
return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
if (!rest) {
if (++storage != Storage.SourceIdsByHeartbeat.end()) {
newVersion = storage->first;
} else {
newVersion = diff->first;
}
} else {
break;
}
}

if (newVersion) {
storage = Storage.SourceIdsByHeartbeat.find(*newVersion);
if (storage != Storage.SourceIdsByHeartbeat.end()) {
return GetFromStorage(storage);
} else {
return GetFromDiff(diff);
}
}
}

return Nothing();
}

TMaybe<THeartbeat> THeartbeatEmitter::GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const {
Y_ABORT_UNLESS(!it->second.empty());
const auto& someSourceId = *it->second.begin();

Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(someSourceId));
return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
}

TMaybe<THeartbeat> THeartbeatEmitter::GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const {
Y_ABORT_UNLESS(!it->second.empty());
const auto& someSourceId = *it->second.begin();

Y_ABORT_UNLESS(Heartbeats.contains(someSourceId));
return Heartbeats.at(someSourceId);
}

}
19 changes: 11 additions & 8 deletions ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,17 @@ struct TSourceIdInfo {
}; // TSourceIdInfo

class THeartbeatProcessor {
public:
THeartbeatProcessor() = default;
explicit THeartbeatProcessor(
const THashSet<TString>& sourceIdsWithHeartbeat,
const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat);
protected:
using TSourceIdsByHeartbeat = TMap<TRowVersion, THashSet<TString>>;

public:
void ApplyHeartbeat(const TString& sourceId, const TRowVersion& version);
void ForgetHeartbeat(const TString& sourceId, const TRowVersion& version);
void ForgetSourceId(const TString& sourceId);

protected:
THashSet<TString> SourceIdsWithHeartbeat;
TMap<TRowVersion, THashSet<TString>> SourceIdsByHeartbeat;
TSourceIdsByHeartbeat SourceIdsByHeartbeat;

}; // THeartbeatProcessor

Expand Down Expand Up @@ -151,12 +149,17 @@ class THeartbeatEmitter: private THeartbeatProcessor {
public:
explicit THeartbeatEmitter(const TSourceIdStorage& storage);

void Process(const TString& sourceId, const THeartbeat& heartbeat);
void Process(const TString& sourceId, THeartbeat&& heartbeat);
TMaybe<THeartbeat> CanEmit() const;

private:
TMaybe<THeartbeat> GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const;
TMaybe<THeartbeat> GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const;

private:
const TSourceIdStorage& Storage;
THashMap<TString, THeartbeat> LastHeartbeats;
THashSet<TString> NewSourceIdsWithHeartbeat;
THashMap<TString, THeartbeat> Heartbeats;

}; // THeartbeatEmitter

Expand Down
Loading