Skip to content

Commit

Permalink
data accessor has to own portion info (#11060)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 29, 2024
1 parent 7c45196 commit 08f5064
Show file tree
Hide file tree
Showing 44 changed files with 226 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
} else {
granule.InsertPortionOnExecute(txc, NOlap::TPortionDataAccessor(*portion.GetPortionInfo()));
granule.InsertPortionOnExecute(txc, NOlap::TPortionDataAccessor(portion.GetPortionInfo()));
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/data_locks/locks/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ class TListPortionsLock: public ILock {
}
}

TListPortionsLock(const TString& lockName, const std::vector<TPortionInfo::TConstPtr>& portions, const bool readOnly = false)
: TBase(lockName, readOnly) {
for (auto&& p : portions) {
Portions.emplace(p->GetAddress());
Granules.emplace(p->GetPathId());
}
}

TListPortionsLock(const TString& lockName, const std::vector<TPortionInfo>& portions, const bool readOnly = false)
: TBase(lockName, readOnly) {
for (auto&& p : portions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet
const TVersionedIndex& index) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
for (auto&& i : Portions) {
auto schema = i.GetSchema(index);
auto schema = i->GetSchema(index);
TPortionDataAccessor(i).FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace NKikimr::NOlap::NDataSharing::NEvents {
class TPathIdData {
private:
YDB_READONLY(ui64, PathId, 0);
YDB_ACCESSOR_DEF(std::vector<TPortionInfo>, Portions);
YDB_ACCESSOR_DEF(std::vector<TPortionInfo::TPtr>, Portions);

TPathIdData() = default;

Expand All @@ -31,7 +31,7 @@ class TPathIdData {
}
PathId = proto.GetPathId();
for (auto&& portionProto : proto.GetPortions()) {
TConclusion<TPortionInfo> portion = TPortionInfo::BuildFromProto(portionProto, indexInfo);
TConclusion<TPortionInfo::TPtr> portion = TPortionInfo::BuildFromProto(portionProto, indexInfo);
if (!portion) {
return portion.GetError();
}
Expand All @@ -41,12 +41,12 @@ class TPathIdData {
}

public:
TPathIdData(const ui64 pathId, const std::vector<TPortionInfo>& portions)
TPathIdData(const ui64 pathId, const std::vector<TPortionInfo::TPtr>& portions)
: PathId(pathId)
, Portions(portions) {
}

std::vector<TPortionInfo> DetachPortions() {
std::vector<TPortionInfo::TPtr> DetachPortions() {
return std::move(Portions);
}
THashMap<TTabletId, TTaskForTablet> BuildLinkTabletTasks(const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId,
Expand All @@ -55,9 +55,9 @@ class TPathIdData {
void InitPortionIds(ui64* lastPortionId, const std::optional<ui64> pathId = {}) {
AFL_VERIFY(lastPortionId);
for (auto&& i : Portions) {
i.SetPortionId(++*lastPortionId);
i->SetPortionId(++*lastPortionId);
if (pathId) {
i.SetPathId(*pathId);
i->SetPathId(*pathId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(
auto it = PathIds.find(i.first);
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
for (auto&& portion : i.second.DetachPortions()) {
portion.SetPathId(it->second);
index.AppendPortion(std::move(portion));
portion->SetPathId(it->second);
index.AppendPortion(*portion);
}
}
return TConclusionStatus::Success();
Expand Down Expand Up @@ -167,15 +167,15 @@ bool TDestinationSession::DoStart(
THashMap<TString, THashSet<TUnifiedBlobId>> local;
for (auto&& i : portions) {
for (auto&& p : i.second) {
TPortionDataAccessor(*p).FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
TPortionDataAccessor(p).FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
}
}
std::swap(CurrentBlobIds, local);
SendCurrentCursorAck(shard, {});
return true;
}

bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo& portion) {
bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo::TConstPtr& portion) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
TPortionDataAccessor(portion).FillBlobIdsByStorage(blobIds, vIndex);
ui32 containsCounter = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class TDestinationSession: public TCommonSession {
}

public:
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo& portion);
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const std::shared_ptr<const TPortionInfo>& portion);

TSourceCursorForDestination& GetCursorVerified(const TTabletId& tabletId) {
auto it = Cursors.find(tabletId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std
++p;
} else {
i.second.MutablePortions()[p] = std::move(i.second.MutablePortions().back());
i.second.MutablePortions()[p].ResetShardingVersion();
i.second.MutablePortions()[p]->ResetShardingVersion();
i.second.MutablePortions().pop_back();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ void TSourceCursor::BuildSelection(const std::shared_ptr<IStoragesManager>& stor
ui32 chunksCount = 0;
bool selectMore = true;
for (; itCurrentPath != PortionsForSend.end() && selectMore; ++itCurrentPath) {
std::vector<TPortionInfo> portions;
std::vector<TPortionInfo::TPtr> portions;
for (; itPortion != itCurrentPath->second.end(); ++itPortion) {
selectMore = (count < 10000 && chunksCount < 1000000);
if (!selectMore) {
NextPathId = itCurrentPath->first;
NextPortionId = itPortion->first;
} else {
portions.emplace_back(*itPortion->second);
portions.emplace_back(itPortion->second);
chunksCount += TPortionDataAccessor(portions.back()).GetRecords().size();
chunksCount += TPortionDataAccessor(portions.back()).GetIndexes().size();
++count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TSharedBlobsManager;

class TSourceCursor {
private:
std::map<ui64, std::map<ui32, std::shared_ptr<TPortionInfo>>> PortionsForSend;
std::map<ui64, std::map<ui32, TPortionInfo::TPtr>> PortionsForSend;
THashMap<ui64, NEvents::TPathIdData> PreviousSelected;
THashMap<ui64, NEvents::TPathIdData> Selected;
THashMap<TTabletId, TTaskForTablet> Links;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class TColumnEngineChanges {
public:
class IMemoryPredictor {
public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) = 0;
virtual ui64 AddPortion(const TPortionInfo::TConstPtr& portionInfo) = 0;
virtual ~IMemoryPredictor() = default;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ bool TTieringProcessContext::AddPortion(
}
features.OnSkipPortionWithTxLimit(Counters, *dWait);
}
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(*info);
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(info);
}
it->second.back().MutableTxWriteVolume() += info->GetTxVolume();
if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) {
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void TCleanupPortionsColumnEngineChanges::DoDebugString(TStringOutput& out) cons
if (ui32 dropped = PortionsToDrop.size()) {
out << "drop " << dropped << " portions";
for (auto& portionInfo : PortionsToDrop) {
out << portionInfo.DebugString();
out << portionInfo->DebugString();
}
}
}
Expand All @@ -26,7 +26,7 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC
for (auto&& p : PortionsToDrop) {
TPortionDataAccessor(p).RemoveFromDatabase(context.DBWrapper);
TPortionDataAccessor(p).FillBlobIdsByStorage(blobIdsByStorage, context.EngineLogs.GetVersionedIndex());
pathIds.emplace(p.GetPathId());
pathIds.emplace(p->GetPathId());
}
for (auto&& i : blobIdsByStorage) {
auto action = BlobsAction.GetRemoving(i.first);
Expand All @@ -38,14 +38,14 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC

void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
for (auto& portionInfo : PortionsToDrop) {
if (!context.EngineLogs.ErasePortion(portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
if (!context.EngineLogs.ErasePortion(*portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo->DebugString());
}
}
if (self) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
for (auto&& p : PortionsToDrop) {
self->Counters.GetTabletCounters()->OnDropPortionEvent(p.GetTotalRawBytes(), p.GetTotalBlobBytes(), p.GetRecordsCount());
self->Counters.GetTabletCounters()->OnDropPortionEvent(p->GetTotalRawBytes(), p->GetTotalBlobBytes(), p->GetRecordsCount());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/cleanup_portions.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges {

}

std::vector<TPortionInfo> PortionsToDrop;
std::vector<TPortionInfo::TConstPtr> PortionsToDrop;

virtual ui32 GetWritePortionsCount() const override {
return 0;
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
dataColumnIds = ISnapshotSchema::GetColumnsWithDifferentDefaults(schemas, resultSchema);
}
for (auto&& i : SwitchedPortions) {
stats->Merge(TPortionDataAccessor(*i).GetSerializationStat(*resultSchema));
stats->Merge(TPortionDataAccessor(i).GetSerializationStat(*resultSchema));
if (i->GetMeta().GetDeletionsCount()) {
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
if (dataColumnIds.size() != resultSchema->GetColumnsCount()) {
for (auto id : TPortionDataAccessor(*i).GetColumnIds()) {
for (auto id : TPortionDataAccessor(i).GetColumnIds()) {
if (resultSchema->HasColumnId(id)) {
dataColumnIds.emplace(id);
}
Expand Down Expand Up @@ -236,8 +236,8 @@ std::shared_ptr<TGeneralCompactColumnEngineChanges::IMemoryPredictor> TGeneralCo
return std::make_shared<TMemoryPredictorChunkedPolicy>();
}

ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo& portionInfo) {
SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo.GetTotalBlobBytes();
ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo::TConstPtr& portionInfo) {
SumMemoryFix += portionInfo->GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo->GetTotalBlobBytes();
++PortionsCount;
SumMemoryDelta = 0;

Expand Down Expand Up @@ -269,7 +269,7 @@ ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPorti
advanceIterator(columnId, maxChunkSize);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_prediction_after", SumMemoryFix + SumMemoryDelta)(
"portion_info", portionInfo.DebugString());
"portion_info", portionInfo->DebugString());
return SumMemoryFix + SumMemoryDelta;
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
auto predictor = BuildMemoryPredictor();
ui64 result = 0;
for (auto& p : SwitchedPortions) {
result = predictor->AddPortion(*p);
result = predictor->AddPortion(p);
}
return result;
}
Expand Down Expand Up @@ -65,7 +65,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
std::list<TColumnInfo> MaxMemoryByColumnChunk;

public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override;
virtual ui64 AddPortion(const TPortionInfo::TConstPtr& portionInfo) override;
};

static std::shared_ptr<IMemoryPredictor> BuildMemoryPredictor();
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/changes/ttl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TTTLColumnEngineChanges: public TChangesWithAppend {
auto predictor = BuildMemoryPredictor();
ui64 result = 0;
for (auto& p : PortionsToEvict) {
result = predictor->AddPortion(*p.GetPortionInfo());
result = predictor->AddPortion(p.GetPortionInfo());
}
return result;
}
Expand All @@ -66,11 +66,11 @@ class TTTLColumnEngineChanges: public TChangesWithAppend {
ui64 SumBlobsMemory = 0;
ui64 MaxRawMemory = 0;
public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override {
if (MaxRawMemory < portionInfo.GetTotalRawBytes()) {
MaxRawMemory = portionInfo.GetTotalRawBytes();
virtual ui64 AddPortion(const TPortionInfo::TConstPtr& portionInfo) override {
if (MaxRawMemory < portionInfo->GetTotalRawBytes()) {
MaxRawMemory = portionInfo->GetTotalRawBytes();
}
SumBlobsMemory += portionInfo.GetTotalBlobBytes();
SumBlobsMemory += portionInfo->GetTotalBlobBytes();
return SumBlobsMemory + MaxRawMemory;
}
};
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
};
AppendedPortions.erase(std::remove_if(AppendedPortions.begin(), AppendedPortions.end(), predRemoveDroppedTable), AppendedPortions.end());
for (auto& portionInfoWithBlobs : AppendedPortions) {
auto& portionInfo = portionInfoWithBlobs.GetPortionResult();
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
const auto& portionInfo = portionInfoWithBlobs.GetPortionResultPtr();
AFL_VERIFY(usedPortionIds.emplace(portionInfo->GetPortionId()).second)("portion_info", portionInfo->DebugString(true));
TPortionDataAccessor(portionInfo).SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
}
for (auto&& [_, i] : PortionsToMove) {
Expand Down Expand Up @@ -108,7 +108,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
portion->SetRemoveSnapshot(context.Snapshot);
};
context.EngineLogs.ModifyPortionOnComplete(i, pred);
context.EngineLogs.AddCleanupPortion(*i);
context.EngineLogs.AddCleanupPortion(i);
}
for (auto& portionBuilder : AppendedPortions) {
context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult());
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) {
for (const auto& [_, portionInfo] : spg->GetPortions()) {
UpdatePortionStats(*portionInfo, EStatsUpdateType::ADD);
if (portionInfo->CheckForCleanup()) {
AddCleanupPortion(*portionInfo);
AddCleanupPortion(portionInfo);
}
}
}
Expand Down Expand Up @@ -382,7 +382,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
limitExceeded = true;
break;
}
changes->PortionsToDrop.push_back(*info);
changes->PortionsToDrop.push_back(info);
++portionsFromDrop;
}
}
Expand All @@ -400,9 +400,9 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
++i;
continue;
}
AFL_VERIFY(it->second[i].CheckForCleanup(snapshot))("p_snapshot", it->second[i].GetRemoveSnapshotOptional())("snapshot", snapshot);
if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
txSize += it->second[i].GetTxVolume();
AFL_VERIFY(it->second[i]->CheckForCleanup(snapshot))("p_snapshot", it->second[i]->GetRemoveSnapshotOptional())("snapshot", snapshot);
if (txSize + it->second[i]->GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
txSize += it->second[i]->GetTxVolume();
} else {
limitExceeded = true;
break;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ class TColumnEngineForLogs: public IColumnEngine {
return TabletId;
}

void AddCleanupPortion(const TPortionInfo& info) {
AFL_VERIFY(info.HasRemoveSnapshot());
CleanupPortions[info.GetRemoveSnapshotVerified().GetPlanInstant()].emplace_back(info);
void AddCleanupPortion(const TPortionInfo::TConstPtr& info) {
AFL_VERIFY(info->HasRemoveSnapshot());
CleanupPortions[info->GetRemoveSnapshotVerified().GetPlanInstant()].emplace_back(info);
}
void AddShardingInfo(const TGranuleShardingInfo& shardingInfo) {
VersionedIndex.AddShardingInfo(shardingInfo);
Expand All @@ -205,7 +205,7 @@ class TColumnEngineForLogs: public IColumnEngine {
TVersionedIndex VersionedIndex;
ui64 TabletId;
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
std::map<TInstant, std::vector<TPortionInfo>> CleanupPortions;
std::map<TInstant, std::vector<TPortionInfo::TConstPtr>> CleanupPortions;
TColumnEngineStats Counters;
ui64 LastPortion;
ui64 LastGranule;
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization)
}
AFL_VERIFY(itRecord == Records.end());
AFL_VERIFY(itBlobIdx == BlobIdxs.end());
} else {
for (auto&& i : Records) {
AFL_VERIFY(i.BlobRange.IsValid());
}
for (auto&& i : Indexes) {
if (auto* blobId = i.GetBlobRangeOptional()) {
AFL_VERIFY(blobId->IsValid());
}
}
}

result.Indexes = std::move(Indexes);
Expand Down
Loading

0 comments on commit 08f5064

Please sign in to comment.