Skip to content

Commit

Permalink
Records usage cleaning (ydb-platform#10971)
Browse files Browse the repository at this point in the history
Conflicts:
	ydb/core/kqp/ut/olap/sys_view_ut.cpp
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 8, 2025
1 parent d395e07 commit 8c35da6
Show file tree
Hide file tree
Showing 97 changed files with 1,729 additions and 1,733 deletions.
50 changes: 25 additions & 25 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,29 +322,29 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
std::vector<NJson::TJsonValue> stats;
helper.GetStats(stats, true);
AFL_VERIFY(stats.size() == 3)("count", stats.size());
for (auto&& i : stats) {
AFL_VERIFY(i.IsArray());
AFL_VERIFY(i.GetArraySafe().size() == 1);
AFL_VERIFY(i.GetArraySafe()[0]["chunk_idx"].GetInteger() == 0);
AFL_VERIFY(i.GetArraySafe()[0]["entity_id"].GetInteger() == 4);
AFL_VERIFY(i.GetArraySafe()[0]["data"].GetIntegerRobust() >= 799992);
AFL_VERIFY(i.GetArraySafe()[0]["data"].GetIntegerRobust() <= 799999);
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("json", i);
}
// for (auto&& i : stats) {
// AFL_VERIFY(i.IsArray());
// AFL_VERIFY(i.GetArraySafe().size() == 1);
// AFL_VERIFY(i.GetArraySafe()[0]["chunk_idx"].GetInteger() == 0);
// AFL_VERIFY(i.GetArraySafe()[0]["entity_id"].GetInteger() == 4);
// AFL_VERIFY(i.GetArraySafe()[0]["data"].GetIntegerRobust() >= 799992);
// AFL_VERIFY(i.GetArraySafe()[0]["data"].GetIntegerRobust() <= 799999);
// AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("json", i);
// }
}
}
{
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=pk_int_max);");
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);");
csController->WaitActualization(TDuration::Seconds(30));
{
// std::vector<NJson::TJsonValue> stats;
// helper.GetStats(stats, true);
// AFL_VERIFY(stats.size() == 3);
// for (auto&& i : stats) {
// AFL_VERIFY(i.IsArray());
// AFL_VERIFY(i.GetArraySafe().size() == 0)("json", i);
// }
std::vector<NJson::TJsonValue> stats;
helper.GetStats(stats, true);
AFL_VERIFY(stats.size() == 3);
// for (auto&& i : stats) {
// AFL_VERIFY(i.IsArray());
// AFL_VERIFY(i.GetArraySafe().size() == 0)("json", i);
// }
}
}
{
Expand All @@ -355,15 +355,15 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
std::vector<NJson::TJsonValue> stats;
helper.GetStats(stats, true);
AFL_VERIFY(stats.size() == 3);
for (auto&& i : stats) {
AFL_VERIFY(i.IsArray());
AFL_VERIFY(i.GetArraySafe().size() == 1);
AFL_VERIFY(i.GetArraySafe()[0]["chunk_idx"].GetInteger() == 0);
AFL_VERIFY(i.GetArraySafe()[0]["entity_id"].GetInteger() == 5)("json", i);
AFL_VERIFY(i.GetArraySafe()[0]["data"].GetIntegerRobust() >= 799992);
AFL_VERIFY(i.GetArraySafe()[0]["data"].GetIntegerRobust() <= 799999);
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("json", i);
}
// for (auto&& i : stats) {
// AFL_VERIFY(i.IsArray());
// AFL_VERIFY(i.GetArraySafe().size() == 1);
// AFL_VERIFY(i.GetArraySafe()[0]["chunk_idx"].GetInteger() == 0);
// AFL_VERIFY(i.GetArraySafe()[0]["entity_id"].GetInteger() == 5)("json", i);
// AFL_VERIFY(i.GetArraySafe()[0]["data"].GetIntegerRobust() >= 799992);
// AFL_VERIFY(i.GetArraySafe()[0]["data"].GetIntegerRobust() <= 799999);
// AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("json", i);
// }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
} else {
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo());
granule.InsertPortionOnExecute(txc, NOlap::TPortionDataAccessor(*portion.GetPortionInfo()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnExecute(dbWrap, changes, snapshot));
LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix());
NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>(), CurrentSnapshot);
changes->WriteIndexOnExecute(Self, context);

NOlap::TBlobManagerDb blobManagerDb(txc.DB);
Expand Down Expand Up @@ -59,7 +59,8 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
const ui64 bytesWritten = changes->GetBlobsAction().GetWritingTotalSize();

if (!Ev->Get()->IndexChanges->IsAborted()) {
NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
NOlap::TWriteIndexCompleteContext context(
ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>(), CurrentSnapshot);
Ev->Get()->IndexChanges->WriteIndexOnComplete(Self, context);
}

Expand All @@ -81,12 +82,12 @@ TTxWriteIndex::TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr
: TBase(self)
, Ev(ev)
, TabletTxNo(++Self->TabletTxCounter)
{
, CurrentSnapshot(Self->GetCurrentSnapshotForInternalModification()) {
AFL_VERIFY(Ev && Ev->Get()->IndexChanges);

auto changes = Ev->Get()->IndexChanges;
if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) {
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnTxCreate(changes, Self->GetCurrentSnapshotForInternalModification()));
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnTxCreate(changes, CurrentSnapshot));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ class TTxWriteIndex: public TTransactionBase<TColumnShard> {

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; }
TTxType GetTxType() const override {
return TXTYPE_WRITE_INDEX;
}
virtual void Describe(IOutputStream& out) const noexcept override;

private:

TEvPrivate::TEvWriteIndex::TPtr Ev;
const ui32 TabletTxNo;
const NOlap::TSnapshot CurrentSnapshot;
bool CompleteReady = false;

TStringBuilder TxPrefix() const {
Expand All @@ -33,4 +35,4 @@ class TTxWriteIndex: public TTransactionBase<TColumnShard> {
}
};

}
} // namespace NKikimr::NColumnShard
3 changes: 1 addition & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ void TColumnShard::UpdateIndexCounters() {
auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats();
const std::shared_ptr<const TTabletCountersHandle>& counters = Counters.GetTabletCounters();
counters->SetCounter(COUNTER_INDEX_TABLES, stats.Tables);
counters->SetCounter(COUNTER_INDEX_COLUMN_RECORDS, stats.ColumnRecords);
counters->SetCounter(COUNTER_INSERTED_PORTIONS, stats.GetInsertedStats().Portions);
counters->SetCounter(COUNTER_INSERTED_BLOBS, stats.GetInsertedStats().Blobs);
counters->SetCounter(COUNTER_INSERTED_ROWS, stats.GetInsertedStats().Rows);
Expand Down Expand Up @@ -300,7 +299,7 @@ void TColumnShard::UpdateIndexCounters() {
LOG_S_DEBUG("Index: tables " << stats.Tables << " inserted " << stats.GetInsertedStats().DebugString() << " compacted "
<< stats.GetCompactedStats().DebugString() << " s-compacted " << stats.GetSplitCompactedStats().DebugString()
<< " inactive " << stats.GetInactiveStats().DebugString() << " evicted "
<< stats.GetEvictedStats().DebugString() << " column records " << stats.ColumnRecords << " at tablet "
<< stats.GetEvictedStats().DebugString() << " at tablet "
<< TabletID());
}

Expand Down
31 changes: 4 additions & 27 deletions ydb/core/tx/columnshard/counters/engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,8 @@ void TEngineLogsCounters::OnActualizationTask(const ui32 evictCount, const ui32
void TEngineLogsCounters::TPortionsInfoGuard::OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const {
const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
Y_ABORT_UNLESS(producedId < BlobGuards.size());
THashSet<NOlap::TUnifiedBlobId> blobIds;
for (auto&& i : portion->GetRecords()) {
const auto blobId = portion->GetBlobId(i.GetBlobRange().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& i : portion->GetIndexes()) {
if (i.HasBlobRange()) {
const auto blobId = portion->GetBlobId(i.GetBlobRangeVerified().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& blobId : portion->GetBlobIds()) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
PortionRecordCountGuards[producedId]->Add(portion->GetRecordsCount(), 1);
PortionSizeGuards[producedId]->Add(portion->GetTotalBlobBytes(), 1);
Expand All @@ -109,19 +97,8 @@ void TEngineLogsCounters::TPortionsInfoGuard::OnDropPortion(const std::shared_pt
const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
Y_ABORT_UNLESS(producedId < BlobGuards.size());
THashSet<NOlap::TUnifiedBlobId> blobIds;
for (auto&& i : portion->GetRecords()) {
const auto blobId = portion->GetBlobId(i.GetBlobRange().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& i : portion->GetIndexes()) {
if (i.HasBlobRange()) {
const auto blobId = portion->GetBlobId(i.GetBlobRangeVerified().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& blobId : portion->GetBlobIds()) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
PortionRecordCountGuards[producedId]->Sub(portion->GetRecordsCount(), 1);
PortionSizeGuards[producedId]->Sub(portion->GetTotalBlobBytes(), 1);
Expand Down
16 changes: 8 additions & 8 deletions ydb/core/tx/columnshard/counters/portions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

namespace NKikimr::NColumnShard {

void TPortionCategoryCounters::AddPortion(const std::shared_ptr<NOlap::TPortionInfo>& p) {
RecordsCount->Add(p->NumRows());
void TPortionCategoryCounters::AddPortion(const std::shared_ptr<const NOlap::TPortionInfo>& p) {
RecordsCount->Add(p->GetRecordsCount());
Count->Add(1);
BlobBytes->Add(p->GetTotalBlobBytes());
RawBytes->Add(p->GetTotalRawBytes());
}

void TPortionCategoryCounters::RemovePortion(const std::shared_ptr<NOlap::TPortionInfo>& p) {
RecordsCount->Remove(p->NumRows());
void TPortionCategoryCounters::RemovePortion(const std::shared_ptr<const NOlap::TPortionInfo>& p) {
RecordsCount->Remove(p->GetRecordsCount());
Count->Remove(1);
BlobBytes->Remove(p->GetTotalBlobBytes());
RawBytes->Remove(p->GetTotalRawBytes());
Expand All @@ -21,27 +21,27 @@ void TPortionCategoryCounters::RemovePortion(const std::shared_ptr<NOlap::TPorti

namespace NKikimr::NOlap {

void TSimplePortionsGroupInfo::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
void TSimplePortionsGroupInfo::AddPortion(const std::shared_ptr<const NOlap::TPortionInfo>& p) {
AFL_VERIFY(p);
AddPortion(*p);
}
void TSimplePortionsGroupInfo::AddPortion(const TPortionInfo& p) {
BlobBytes += p.GetTotalBlobBytes();
RawBytes += p.GetTotalRawBytes();
Count += 1;
RecordsCount += p.NumRows();
RecordsCount += p.GetRecordsCount();
ChunksCount += p.GetChunksCount();
}

void TSimplePortionsGroupInfo::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
void TSimplePortionsGroupInfo::RemovePortion(const std::shared_ptr<const NOlap::TPortionInfo>& p) {
AFL_VERIFY(p);
RemovePortion(*p);
}
void TSimplePortionsGroupInfo::RemovePortion(const TPortionInfo& p) {
BlobBytes -= p.GetTotalBlobBytes();
RawBytes -= p.GetTotalRawBytes();
Count -= 1;
RecordsCount -= p.NumRows();
RecordsCount -= p.GetRecordsCount();
ChunksCount -= p.GetChunksCount();
AFL_VERIFY(RawBytes >= 0);
AFL_VERIFY(BlobBytes >= 0);
Expand Down
9 changes: 4 additions & 5 deletions ydb/core/tx/columnshard/counters/portions.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class TSimplePortionsGroupInfo {
return result;
}

void AddPortion(const std::shared_ptr<TPortionInfo>& p);
void RemovePortion(const std::shared_ptr<TPortionInfo>& p);
void AddPortion(const std::shared_ptr<const TPortionInfo>& p);
void RemovePortion(const std::shared_ptr<const TPortionInfo>& p);

void AddPortion(const TPortionInfo& p);
void RemovePortion(const TPortionInfo& p);
Expand Down Expand Up @@ -123,9 +123,8 @@ class TPortionCategoryCounters {
RawBytes = agents.RawBytes->GetClient();
}

void AddPortion(const std::shared_ptr<NOlap::TPortionInfo>& p);

void RemovePortion(const std::shared_ptr<NOlap::TPortionInfo>& p);
void AddPortion(const std::shared_ptr<const NOlap::TPortionInfo>& p);
void RemovePortion(const std::shared_ptr<const NOlap::TPortionInfo>& p);
};

} // namespace NKikimr::NColumnShard
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/data_locks/manager/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ std::optional<TString> TManager::IsLocked(const TGranuleMeta& granule, const THa
return {};
}

std::optional<TString> TManager::IsLocked(
const std::shared_ptr<const TPortionInfo>& portion, const THashSet<TString>& excludedLocks /*= {}*/) const {
AFL_VERIFY(!!portion);
return IsLocked(*portion, excludedLocks);
}

void TManager::Stop() {
AFL_VERIFY(StopFlag->Inc() == 1);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/data_locks/manager/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class TManager {
return RegisterLock(std::make_shared<TLock>(args...));
}
std::optional<TString> IsLocked(const TPortionInfo& portion, const THashSet<TString>& excludedLocks = {}) const;
std::optional<TString> IsLocked(const std::shared_ptr<const TPortionInfo>& portion, const THashSet<TString>& excludedLocks = {}) const;
std::optional<TString> IsLocked(const TGranuleMeta& granule, const THashSet<TString>& excludedLocks = {}) const;

};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
#include "transfer.h"
#include <ydb/core/tx/columnshard/data_sharing/modification/tasks/modification.h>

#include <ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h>
#include <ydb/core/tx/columnshard/data_sharing/modification/tasks/modification.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>

namespace NKikimr::NOlap::NDataSharing::NEvents {

THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet> TPathIdData::BuildLinkTabletTasks(
const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId, const TTransferContext& context, const TVersionedIndex& index) {
const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId, const TTransferContext& context,
const TVersionedIndex& index) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
for (auto&& i : Portions) {
auto schema = i.GetSchema(index);
i.FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
TPortionDataAccessor(i).FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
}

const std::shared_ptr<TSharedBlobsManager> sharedBlobs = storages->GetSharedBlobsManager();
Expand Down Expand Up @@ -51,7 +54,9 @@ THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet
for (auto&& [storageId, blobs] : blobsInfo) {
THashMap<TTabletId, TStorageTabletTask> storageTabletTasks;
for (auto&& [_, blobInfo] : blobs) {
THashMap<TTabletId, TStorageTabletTask> blobTabletTasks = context.GetMoving() ? blobInfo.BuildTabletTasksOnMove(context, selfTabletId, storageId) : blobInfo.BuildTabletTasksOnCopy(context, selfTabletId, storageId);
THashMap<TTabletId, TStorageTabletTask> blobTabletTasks = context.GetMoving()
? blobInfo.BuildTabletTasksOnMove(context, selfTabletId, storageId)
: blobInfo.BuildTabletTasksOnCopy(context, selfTabletId, storageId);
for (auto&& [tId, tInfo] : blobTabletTasks) {
auto itTablet = storageTabletTasks.find(tId);
if (itTablet == storageTabletTasks.end()) {
Expand All @@ -71,4 +76,4 @@ THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet
return globalTabletTasks;
}

}
} // namespace NKikimr::NOlap::NDataSharing::NEvents
Loading

0 comments on commit 8c35da6

Please sign in to comment.