Skip to content

Commit

Permalink
Merge 59f18af into d4eaec4
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 28, 2024
2 parents d4eaec4 + 59f18af commit 7ca8135
Show file tree
Hide file tree
Showing 62 changed files with 1,352 additions and 1,423 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
} else {
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo());
granule.InsertPortionOnExecute(txc, TPortionDataAccessor(*portion.GetPortionInfo()));
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ void TColumnShard::UpdateIndexCounters() {
auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats();
const std::shared_ptr<const TTabletCountersHandle>& counters = Counters.GetTabletCounters();
counters->SetCounter(COUNTER_INDEX_TABLES, stats.Tables);
counters->SetCounter(COUNTER_INDEX_COLUMN_RECORDS, stats.ColumnRecords);
counters->SetCounter(COUNTER_INSERTED_PORTIONS, stats.GetInsertedStats().Portions);
counters->SetCounter(COUNTER_INSERTED_BLOBS, stats.GetInsertedStats().Blobs);
counters->SetCounter(COUNTER_INSERTED_ROWS, stats.GetInsertedStats().Rows);
Expand Down Expand Up @@ -300,7 +299,7 @@ void TColumnShard::UpdateIndexCounters() {
LOG_S_DEBUG("Index: tables " << stats.Tables << " inserted " << stats.GetInsertedStats().DebugString() << " compacted "
<< stats.GetCompactedStats().DebugString() << " s-compacted " << stats.GetSplitCompactedStats().DebugString()
<< " inactive " << stats.GetInactiveStats().DebugString() << " evicted "
<< stats.GetEvictedStats().DebugString() << " column records " << stats.ColumnRecords << " at tablet "
<< stats.GetEvictedStats().DebugString() << " at tablet "
<< TabletID());
}

Expand Down
31 changes: 4 additions & 27 deletions ydb/core/tx/columnshard/counters/engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,8 @@ void TEngineLogsCounters::OnActualizationTask(const ui32 evictCount, const ui32
void TEngineLogsCounters::TPortionsInfoGuard::OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const {
const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
Y_ABORT_UNLESS(producedId < BlobGuards.size());
THashSet<NOlap::TUnifiedBlobId> blobIds;
for (auto&& i : portion->GetRecords()) {
const auto blobId = portion->GetBlobId(i.GetBlobRange().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& i : portion->GetIndexes()) {
if (i.HasBlobRange()) {
const auto blobId = portion->GetBlobId(i.GetBlobRangeVerified().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& blobId : portion->GetBlobIds()) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
PortionRecordCountGuards[producedId]->Add(portion->GetRecordsCount(), 1);
PortionSizeGuards[producedId]->Add(portion->GetTotalBlobBytes(), 1);
Expand All @@ -109,19 +97,8 @@ void TEngineLogsCounters::TPortionsInfoGuard::OnDropPortion(const std::shared_pt
const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
Y_ABORT_UNLESS(producedId < BlobGuards.size());
THashSet<NOlap::TUnifiedBlobId> blobIds;
for (auto&& i : portion->GetRecords()) {
const auto blobId = portion->GetBlobId(i.GetBlobRange().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& i : portion->GetIndexes()) {
if (i.HasBlobRange()) {
const auto blobId = portion->GetBlobId(i.GetBlobRangeVerified().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& blobId : portion->GetBlobIds()) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
PortionRecordCountGuards[producedId]->Sub(portion->GetRecordsCount(), 1);
PortionSizeGuards[producedId]->Sub(portion->GetTotalBlobBytes(), 1);
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/counters/portions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
namespace NKikimr::NColumnShard {

void TPortionCategoryCounters::AddPortion(const std::shared_ptr<NOlap::TPortionInfo>& p) {
RecordsCount->Add(p->NumRows());
RecordsCount->Add(p->GetRecordsCount());
Count->Add(1);
BlobBytes->Add(p->GetTotalBlobBytes());
RawBytes->Add(p->GetTotalRawBytes());
}

void TPortionCategoryCounters::RemovePortion(const std::shared_ptr<NOlap::TPortionInfo>& p) {
RecordsCount->Remove(p->NumRows());
RecordsCount->Remove(p->GetRecordsCount());
Count->Remove(1);
BlobBytes->Remove(p->GetTotalBlobBytes());
RawBytes->Remove(p->GetTotalRawBytes());
Expand All @@ -29,7 +29,7 @@ void TSimplePortionsGroupInfo::AddPortion(const TPortionInfo& p) {
BlobBytes += p.GetTotalBlobBytes();
RawBytes += p.GetTotalRawBytes();
Count += 1;
RecordsCount += p.NumRows();
RecordsCount += p.GetRecordsCount();
ChunksCount += p.GetChunksCount();
}

Expand All @@ -41,7 +41,7 @@ void TSimplePortionsGroupInfo::RemovePortion(const TPortionInfo& p) {
BlobBytes -= p.GetTotalBlobBytes();
RawBytes -= p.GetTotalRawBytes();
Count -= 1;
RecordsCount -= p.NumRows();
RecordsCount -= p.GetRecordsCount();
ChunksCount -= p.GetChunksCount();
AFL_VERIFY(RawBytes >= 0);
AFL_VERIFY(BlobBytes >= 0);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
#include "transfer.h"
#include <ydb/core/tx/columnshard/data_sharing/modification/tasks/modification.h>

#include <ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h>
#include <ydb/core/tx/columnshard/data_sharing/modification/tasks/modification.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>

namespace NKikimr::NOlap::NDataSharing::NEvents {

THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet> TPathIdData::BuildLinkTabletTasks(
const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId, const TTransferContext& context, const TVersionedIndex& index) {
const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId, const TTransferContext& context,
const TVersionedIndex& index) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
for (auto&& i : Portions) {
auto schema = i.GetSchema(index);
i.FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
TPortionDataAccessor(i).FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
}

const std::shared_ptr<TSharedBlobsManager> sharedBlobs = storages->GetSharedBlobsManager();
Expand Down Expand Up @@ -51,7 +54,9 @@ THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet
for (auto&& [storageId, blobs] : blobsInfo) {
THashMap<TTabletId, TStorageTabletTask> storageTabletTasks;
for (auto&& [_, blobInfo] : blobs) {
THashMap<TTabletId, TStorageTabletTask> blobTabletTasks = context.GetMoving() ? blobInfo.BuildTabletTasksOnMove(context, selfTabletId, storageId) : blobInfo.BuildTabletTasksOnCopy(context, selfTabletId, storageId);
THashMap<TTabletId, TStorageTabletTask> blobTabletTasks = context.GetMoving()
? blobInfo.BuildTabletTasksOnMove(context, selfTabletId, storageId)
: blobInfo.BuildTabletTasksOnCopy(context, selfTabletId, storageId);
for (auto&& [tId, tInfo] : blobTabletTasks) {
auto itTablet = storageTabletTasks.find(tId);
if (itTablet == storageTabletTasks.end()) {
Expand All @@ -71,4 +76,4 @@ THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet
return globalTabletTasks;
}

}
} // namespace NKikimr::NOlap::NDataSharing::NEvents
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TPathIdData {
void InitPortionIds(ui64* lastPortionId, const std::optional<ui64> pathId = {}) {
AFL_VERIFY(lastPortionId);
for (auto&& i : Portions) {
i.SetPortion(++*lastPortionId);
i.SetPortionId(++*lastPortionId);
if (pathId) {
i.SetPathId(*pathId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
#include <ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_ack_from_initiator.h>
#include <ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>

namespace NKikimr::NOlap::NDataSharing {

NKikimr::TConclusionStatus TDestinationSession::DataReceived(THashMap<ui64, NEvents::TPathIdData>&& data, TColumnEngineForLogs& index, const std::shared_ptr<IStoragesManager>& /*manager*/) {
NKikimr::TConclusionStatus TDestinationSession::DataReceived(
THashMap<ui64, NEvents::TPathIdData>&& data, TColumnEngineForLogs& index, const std::shared_ptr<IStoragesManager>& /*manager*/) {
auto guard = index.GranulesStorage->GetStats()->StartPackModification();
for (auto&& i : data) {
auto it = PathIds.find(i.first);
Expand Down Expand Up @@ -66,8 +68,8 @@ void TDestinationSession::SendCurrentCursorAck(const NColumnShard::TColumnShard&
AFL_VERIFY(found);
}

NKikimr::TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TDestinationSession::ReceiveData(
NColumnShard::TColumnShard* self, const THashMap<ui64, NEvents::TPathIdData>& data, const ui32 receivedPackIdx, const TTabletId sourceTabletId,
NKikimr::TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TDestinationSession::ReceiveData(NColumnShard::TColumnShard* self,
const THashMap<ui64, NEvents::TPathIdData>& data, const ui32 receivedPackIdx, const TTabletId sourceTabletId,
const std::shared_ptr<TDestinationSession>& selfPtr) {
auto result = GetCursorVerified(sourceTabletId).ReceiveData(receivedPackIdx);
if (!result) {
Expand All @@ -76,18 +78,21 @@ NKikimr::TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TDestin
return std::unique_ptr<NTabletFlatExecutor::ITransaction>(new TTxDataFromSource(self, selfPtr, data, sourceTabletId));
}

NKikimr::TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TDestinationSession::ReceiveFinished(NColumnShard::TColumnShard* self, const TTabletId sourceTabletId, const std::shared_ptr<TDestinationSession>& selfPtr) {
NKikimr::TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TDestinationSession::ReceiveFinished(
NColumnShard::TColumnShard* self, const TTabletId sourceTabletId, const std::shared_ptr<TDestinationSession>& selfPtr) {
if (GetCursorVerified(sourceTabletId).GetDataFinished()) {
return TConclusionStatus::Fail("session finished already");
}
return std::unique_ptr<NTabletFlatExecutor::ITransaction>(new TTxFinishFromSource(self, sourceTabletId, selfPtr));
}

NKikimr::TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TDestinationSession::AckInitiatorFinished(NColumnShard::TColumnShard* self, const std::shared_ptr<TDestinationSession>& selfPtr) {
NKikimr::TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TDestinationSession::AckInitiatorFinished(
NColumnShard::TColumnShard* self, const std::shared_ptr<TDestinationSession>& selfPtr) {
return std::unique_ptr<NTabletFlatExecutor::ITransaction>(new TTxFinishAckFromInitiator(self, selfPtr));
}

NKikimr::TConclusionStatus TDestinationSession::DeserializeDataFromProto(const NKikimrColumnShardDataSharingProto::TDestinationSession& proto, const TColumnEngineForLogs& index) {
NKikimr::TConclusionStatus TDestinationSession::DeserializeDataFromProto(
const NKikimrColumnShardDataSharingProto::TDestinationSession& proto, const TColumnEngineForLogs& index) {
if (!InitiatorController.DeserializeFromProto(proto.GetInitiatorController())) {
return TConclusionStatus::Fail("cannot parse initiator controller: " + proto.GetInitiatorController().DebugString());
}
Expand Down Expand Up @@ -139,7 +144,8 @@ NKikimrColumnShardDataSharingProto::TDestinationSession::TFullCursor TDestinatio
return result;
}

NKikimr::TConclusionStatus TDestinationSession::DeserializeCursorFromProto(const NKikimrColumnShardDataSharingProto::TDestinationSession::TFullCursor& proto) {
NKikimr::TConclusionStatus TDestinationSession::DeserializeCursorFromProto(
const NKikimrColumnShardDataSharingProto::TDestinationSession::TFullCursor& proto) {
ConfirmedFlag = proto.GetConfirmedFlag();
for (auto&& i : proto.GetSourceCursors()) {
TSourceCursorForDestination cursor;
Expand All @@ -154,13 +160,14 @@ NKikimr::TConclusionStatus TDestinationSession::DeserializeCursorFromProto(const
return TConclusionStatus::Success();
}

bool TDestinationSession::DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) {
bool TDestinationSession::DoStart(
const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) {
AFL_VERIFY(IsConfirmed());
NYDBTest::TControllers::GetColumnShardController()->OnDataSharingStarted(shard.TabletID(), GetSessionId());
THashMap<TString, THashSet<TUnifiedBlobId>> local;
for (auto&& i : portions) {
for (auto&& p : i.second) {
p->FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
TPortionDataAccessor(*p).FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
}
}
std::swap(CurrentBlobIds, local);
Expand All @@ -170,7 +177,7 @@ bool TDestinationSession::DoStart(const NColumnShard::TColumnShard& shard, const

bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo& portion) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
portion.FillBlobIdsByStorage(blobIds, vIndex);
TPortionDataAccessor(portion).FillBlobIdsByStorage(blobIds, vIndex);
ui32 containsCounter = 0;
ui32 newCounter = 0;
for (auto&& i : blobIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ void TSourceCursor::BuildSelection(const std::shared_ptr<IStoragesManager>& stor
NextPortionId = itPortion->first;
} else {
portions.emplace_back(*itPortion->second);
chunksCount += portions.back().GetRecords().size();
chunksCount += portions.back().GetIndexes().size();
chunksCount += TPortionDataAccessor(portions.back()).GetRecords().size();
chunksCount += TPortionDataAccessor(portions.back()).GetIndexes().size();
++count;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "tx_data_ack_to_source.h"

#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>

namespace NKikimr::NOlap::NDataSharing {

Expand All @@ -11,7 +13,7 @@ bool TTxDataAckToSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc
auto& index = Self->GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
for (auto&& [_, i] : Session->GetCursorVerified()->GetPreviousSelected()) {
for (auto&& portion : i.GetPortions()) {
portion.FillBlobIdsByStorage(sharedBlobIds, index);
TPortionDataAccessor(portion).FillBlobIdsByStorage(sharedBlobIds, index);
}
}
for (auto&& i : sharedBlobIds) {
Expand All @@ -31,4 +33,4 @@ void TTxDataAckToSource::DoComplete(const TActorContext& /*ctx*/) {
Session->ActualizeDestination(*Self, Self->GetDataLocksManager());
}

}
} // namespace NKikimr::NOlap::NDataSharing
14 changes: 8 additions & 6 deletions ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "cleanup_portions.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>

namespace NKikimr::NOlap {

Expand All @@ -22,8 +24,8 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC
}
THashMap<TString, THashSet<TUnifiedBlobId>> blobIdsByStorage;
for (auto&& p : PortionsToDrop) {
p.RemoveFromDatabase(context.DBWrapper);
p.FillBlobIdsByStorage(blobIdsByStorage, context.EngineLogs.GetVersionedIndex());
TPortionDataAccessor(p).RemoveFromDatabase(context.DBWrapper);
TPortionDataAccessor(p).FillBlobIdsByStorage(blobIdsByStorage, context.EngineLogs.GetVersionedIndex());
pathIds.emplace(p.GetPathId());
}
for (auto&& i : blobIdsByStorage) {
Expand All @@ -43,7 +45,7 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::T
if (self) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
for (auto&& p : PortionsToDrop) {
self->Counters.GetTabletCounters()->OnDropPortionEvent(p.GetTotalRawBytes(), p.GetTotalBlobBytes(), p.NumRows());
self->Counters.GetTabletCounters()->OnDropPortionEvent(p.GetTotalRawBytes(), p.GetTotalBlobBytes(), p.GetRecordsCount());
}
}
}
Expand All @@ -60,4 +62,4 @@ NColumnShard::ECumulativeCounters TCleanupPortionsColumnEngineChanges::GetCounte
return isSuccess ? NColumnShard::COUNTER_CLEANUP_SUCCESS : NColumnShard::COUNTER_CLEANUP_FAIL;
}

}
} // namespace NKikimr::NOlap
Loading

0 comments on commit 7ca8135

Please sign in to comment.