Skip to content

Commit

Permalink
fix snapshot control in case sys view requests (#7824)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 15, 2024
1 parent 27dc824 commit 752095e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 30 deletions.
45 changes: 30 additions & 15 deletions ydb/core/tx/columnshard/inflight_request_tracker.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "inflight_request_tracker.h"
#include "columnshard_impl.h"
#include "columnshard_schema.h"
#include "inflight_request_tracker.h"

#include "data_sharing/common/transactions/tx_extension.h"
#include "engines/column_engine.h"
#include "engines/reader/plain_reader/constructor/read_metadata.h"
Expand All @@ -9,27 +10,26 @@
namespace NKikimr::NColumnShard {

void TInFlightReadsTracker::RemoveInFlightRequest(ui64 cookie, const NOlap::TVersionedIndex* /*index*/, const TInstant now) {
Y_ABORT_UNLESS(RequestsMeta.contains(cookie), "Unknown request cookie %" PRIu64, cookie);
const auto& readMetaList = RequestsMeta[cookie];
auto it = RequestsMeta.find(cookie);
AFL_VERIFY(it != RequestsMeta.end())("cookie", cookie);
const auto& readMetaList = it->second;

for (const auto& readMetaBase : readMetaList) {
NOlap::NReader::NPlain::TReadMetadata::TConstPtr readMeta = std::dynamic_pointer_cast<const NOlap::NReader::NPlain::TReadMetadata>(readMetaBase);

if (!readMeta) {
continue;
}
{
auto it = SnapshotsLive.find(readMeta->GetRequestSnapshot());
auto it = SnapshotsLive.find(readMetaBase->GetRequestSnapshot());
AFL_VERIFY(it != SnapshotsLive.end());
if (it->second.DelRequest(cookie, now)) {
SnapshotsLive.erase(it);
}
}

auto insertStorage = StoragesManager->GetInsertOperator();
auto tracker = insertStorage->GetBlobsTracker();
for (const auto& committedBlob : readMeta->CommittedBlobs) {
tracker->FreeBlob(committedBlob.GetBlobRange().GetBlobId());
if (NOlap::NReader::NPlain::TReadMetadata::TConstPtr readMeta =
std::dynamic_pointer_cast<const NOlap::NReader::NPlain::TReadMetadata>(readMetaBase)) {
auto insertStorage = StoragesManager->GetInsertOperator();
auto tracker = insertStorage->GetBlobsTracker();
for (const auto& committedBlob : readMeta->CommittedBlobs) {
tracker->FreeBlob(committedBlob.GetBlobRange().GetBlobId());
}
}
}
Counters->OnSnapshotsInfo(SnapshotsLive.size(), GetSnapshotToClean());
Expand Down Expand Up @@ -85,8 +85,7 @@ class TTransactionSavePersistentSnapshots: public NOlap::NDataSharing::TExtended
NColumnShard::TColumnShard* self, std::set<NOlap::TSnapshot>&& saveSnapshots, std::set<NOlap::TSnapshot>&& removeSnapshots)
: TBase(self)
, SaveSnapshots(std::move(saveSnapshots))
, RemoveSnapshots(std::move(removeSnapshots))
{
, RemoveSnapshots(std::move(removeSnapshots)) {
AFL_VERIFY(SaveSnapshots.size() || RemoveSnapshots.size());
}
};
Expand Down Expand Up @@ -139,4 +138,20 @@ bool TInFlightReadsTracker::LoadFromDatabase(NTable::TDatabase& tableDB) {
return true;
}

NKikimr::TConclusion<ui64> TInFlightReadsTracker::AddInFlightRequest(
NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index) {
const ui64 cookie = NextCookie++;
auto it = SnapshotsLive.find(readMeta->GetRequestSnapshot());
if (it == SnapshotsLive.end()) {
it = SnapshotsLive.emplace(readMeta->GetRequestSnapshot(), TSnapshotLiveInfo::BuildFromRequest(readMeta->GetRequestSnapshot())).first;
Counters->OnSnapshotsInfo(SnapshotsLive.size(), GetSnapshotToClean());
}
it->second.AddRequest(cookie);
auto status = AddToInFlightRequest(cookie, readMeta, index);
if (!status) {
return status;
}
return cookie;
}

} // namespace NKikimr::NColumnShard
16 changes: 1 addition & 15 deletions ydb/core/tx/columnshard/inflight_request_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,7 @@ class TInFlightReadsTracker {

// Returns a unique cookie associated with this request
[[nodiscard]] TConclusion<ui64> AddInFlightRequest(
NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index) {
const ui64 cookie = NextCookie++;
auto it = SnapshotsLive.find(readMeta->GetRequestSnapshot());
if (it == SnapshotsLive.end()) {
it =
SnapshotsLive.emplace(readMeta->GetRequestSnapshot(), TSnapshotLiveInfo::BuildFromRequest(readMeta->GetRequestSnapshot())).first;
Counters->OnSnapshotsInfo(SnapshotsLive.size(), GetSnapshotToClean());
}
it->second.AddRequest(cookie);
auto status = AddToInFlightRequest(cookie, readMeta, index);
if (!status) {
return status;
}
return cookie;
}
NOlap::NReader::TReadMetadataBase::TConstPtr readMeta, const NOlap::TVersionedIndex* index);

void RemoveInFlightRequest(ui64 cookie, const NOlap::TVersionedIndex* index, const TInstant now);

Expand Down

0 comments on commit 752095e

Please sign in to comment.