Skip to content

Commit

Permalink
portions index simplification (#12414)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 9, 2024
1 parent 9afb07b commit 3f0791d
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 359 deletions.
36 changes: 12 additions & 24 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,24 +240,13 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
}

{
auto& portionsIndex =
TablesManager.GetPrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>().GetGranuleVerified(writeMeta.GetTableId()).GetPortionsIndex();
{
const ui64 minMemoryRead = portionsIndex.GetMinRawMemoryRead();
if (NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit < minMemoryRead) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "overlimit")("reason", "read_raw_memory")("current", minMemoryRead)(
"limit", NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit)("table_id", writeMeta.GetTableId());
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::OverlimitReadRawMemory);
}
}

{
const ui64 minMemoryRead = portionsIndex.GetMinBlobMemoryRead();
if (NOlap::TGlobalLimits::DefaultBlobsMemoryIntervalLimit < minMemoryRead) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "overlimit")("reason", "read_blob_memory")("current", minMemoryRead)(
"limit", NOlap::TGlobalLimits::DefaultBlobsMemoryIntervalLimit)("table_id", writeMeta.GetTableId());
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::OverlimitReadBlobMemory);
}
auto status = TablesManager.GetPrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>()
.GetGranuleVerified(writeMeta.GetTableId())
.GetOptimizerPlanner()
.CheckWriteData();
if (status.IsFail()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "writing_fail_through_compaction")("reason", status.GetErrorMessage());
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria);
}
}

Expand Down Expand Up @@ -298,10 +287,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID());
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());

NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager,
Counters.GetIndexationCounters().SplitterCounters, Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot());
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
BufferizationWriteActorId, std::move(writeData), context);
NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot());
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<NOlap::TBuildBatchesTask>(BufferizationWriteActorId, std::move(writeData), context);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
Expand Down Expand Up @@ -599,8 +588,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());
Y_ABORT_UNLESS(writeOperation);
writeOperation->SetBehaviour(behaviour);
NOlap::TWritingContext wContext(
pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
NOlap::TWritingContext wContext(pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max());
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
writeOperation->Start(*this, arrowData, source, wContext);
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ enum class EWriteFailReason {
NoTable /* "no_table" */,
IncorrectSchema /* "incorrect_schema" */,
Overload /* "overload" */,
OverlimitReadRawMemory /* "overlimit_read_raw_memory" */,
OverlimitReadBlobMemory /* "overlimit_read_blob_memory" */
CompactionCriteria /* "compaction_criteria" */
};

class TWriteCounters: public TCommonCountersOwner {
Expand Down
33 changes: 2 additions & 31 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,9 @@ std::shared_ptr<NArrow::TColumnFilter> TGeneralCompactColumnEngineChanges::Build
}
}
}
NArrow::TColumnFilter filterCorrection = NArrow::TColumnFilter::BuildDenyFilter();
auto pkSchema = resultSchema->GetIndexInfo().GetReplaceKey();
NArrow::NMerger::TRWSortableBatchPosition pos(batch, 0, pkSchema->field_names(), {}, false);
ui32 posCurrent = 0;
auto excludedIntervalsInfo = GranuleMeta->GetPortionsIndex().GetIntervalFeatures(pInfo, portionsInUsage);
for (auto&& i : excludedIntervalsInfo.GetExcludedIntervals()) {
NArrow::NMerger::TSortableBatchPosition startForFound(i.GetStart().ToBatch(pkSchema), 0, pkSchema->field_names(), {}, false);
NArrow::NMerger::TSortableBatchPosition finishForFound(i.GetFinish().ToBatch(pkSchema), 0, pkSchema->field_names(), {}, false);
auto foundStart =
NArrow::NMerger::TSortableBatchPosition::FindPosition(pos, pos.GetPosition(), batch->num_rows() - 1, startForFound, true);
AFL_VERIFY(foundStart);
AFL_VERIFY(!foundStart->IsLess())("pos", pos.DebugJson())("start", startForFound.DebugJson())("found", foundStart->DebugString());
auto foundFinish =
NArrow::NMerger::TSortableBatchPosition::FindPosition(pos, pos.GetPosition(), batch->num_rows() - 1, finishForFound, false);
AFL_VERIFY(foundFinish);
AFL_VERIFY(foundFinish->GetPosition() >= foundStart->GetPosition());
if (foundFinish->GetPosition() > foundStart->GetPosition()) {
AFL_VERIFY(!foundFinish->IsGreater())("pos", pos.DebugJson())("finish", finishForFound.DebugJson())(
"found", foundFinish->DebugString());
}
filterCorrection.Add(foundStart->GetPosition() - posCurrent, false);
if (foundFinish->IsGreater()) {
filterCorrection.Add(foundFinish->GetPosition() - foundStart->GetPosition(), true);
posCurrent = foundFinish->GetPosition();
} else {
filterCorrection.Add(foundFinish->GetPosition() - foundStart->GetPosition() + 1, true);
posCurrent = foundFinish->GetPosition() + 1;
}
if (GranuleMeta->GetPortionsIndex().HasOlderIntervals(pInfo, portionsInUsage)) {
filterDeleted = NArrow::TColumnFilter::BuildAllowFilter();
}
AFL_VERIFY(filterCorrection.Size() <= batch->num_rows());
filterCorrection.Add(false, batch->num_rows() - filterCorrection.Size());
filterDeleted = filterDeleted.Or(filterCorrection);
}
if (filter) {
*filter = filter->And(filterDeleted);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/granule/granule.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ class TGranuleMeta: TNonCopyable {
return ActualizationIndex->CollectMetadataRequests(Portions);
}

const NStorageOptimizer::IOptimizerPlanner& GetOptimizerPlanner() const {
return *OptimizerPlanner;
}

std::shared_ptr<ITxReader> BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector, const TVersionedIndex& vIndex);
bool TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedIndex);
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& GetDataAccessorsManager() const {
Expand Down
116 changes: 14 additions & 102 deletions ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,114 +3,26 @@

namespace NKikimr::NOlap::NGranule::NPortionsIndex {

TPortionsIndex::TPortionIntervals TPortionsIndex::GetIntervalFeatures(const TPortionInfo& inputPortion, const THashSet<ui64>& skipPortions) const {
auto itFrom = Points.find(inputPortion.IndexKeyStart());
AFL_VERIFY(itFrom != Points.end());
auto itTo = Points.find(inputPortion.IndexKeyEnd());
AFL_VERIFY(itTo != Points.end());
TPortionIntervals portionExcludeIntervals;
while (true) {
std::optional<NArrow::TReplaceKey> nextKey;
for (auto&& [p, _] : itFrom->second.GetPortionIds()) {
if (skipPortions.contains(p)) {
continue;
}
const auto& portionCross = Owner.GetPortionVerified(p);
if (!portionCross.CrossSSWith(inputPortion)) {
continue;
}
if (!nextKey || *nextKey < portionCross.IndexKeyEnd()) {
nextKey = portionCross.IndexKeyEnd();
}
bool TPortionsIndex::HasOlderIntervals(const TPortionInfo& inputPortion, const THashSet<ui64>& skipPortions) const {
for (auto&& [_, p] : Portions) {
if (p->GetPortionId() == inputPortion.GetPortionId()) {
continue;
}
if (nextKey) {
nextKey = std::min(inputPortion.IndexKeyEnd(), *nextKey);
portionExcludeIntervals.Add(itFrom->first, *nextKey);
auto itFromNext = Points.find(*nextKey);
AFL_VERIFY(itFromNext != Points.end());
if (itFromNext == itTo) {
break;
}
if (itFromNext == itFrom) {
++itFrom;
} else {
itFrom = itFromNext;
}
AFL_VERIFY(itFrom != Points.end());
} else {
if (itFrom == itTo) {
break;
}
++itFrom;
AFL_VERIFY(itFrom != Points.end());
if (inputPortion.IndexKeyEnd() < p->IndexKeyStart()) {
continue;
}

}
return portionExcludeIntervals;
}

void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
auto itFrom = Points.find(p->IndexKeyStart());
AFL_VERIFY(itFrom != Points.end());
auto itTo = Points.find(p->IndexKeyEnd());
AFL_VERIFY(itTo != Points.end());
{
const TPortionInfoStat stat(p);
auto it = itFrom;
while (true) {
RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
it->second.RemoveContained(stat);
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
if (it == itTo) {
break;
}
AFL_VERIFY(++it != Points.end());
}
}
if (itFrom != itTo) {
itFrom->second.RemoveStart(p);
if (itFrom->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itFrom->second.GetIntervalStats());
Points.erase(itFrom);
if (p->IndexKeyEnd() < inputPortion.IndexKeyStart()) {
continue;
}
itTo->second.RemoveFinish(p);
if (itTo->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itTo->second.GetIntervalStats());
Points.erase(itTo);
if (skipPortions.contains(p->GetPortionId())) {
continue;
}
} else {
itTo->second.RemoveStart(p);
itTo->second.RemoveFinish(p);
if (itTo->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itTo->second.GetIntervalStats());
Points.erase(itTo);
}
}
RawMemoryUsage.FlushCounters();
BlobMemoryUsage.FlushCounters();
}

void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
auto itFrom = InsertPoint(p->IndexKeyStart());
itFrom->second.AddStart(p);
auto itTo = InsertPoint(p->IndexKeyEnd());
itTo->second.AddFinish(p);

auto it = itFrom;
const TPortionInfoStat stat(p);
while (true) {
RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
it->second.AddContained(stat);
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
if (it == itTo) {
break;
if (inputPortion.RecordSnapshotMax() < p->RecordSnapshotMin()) {
continue;
}
AFL_VERIFY(++it != Points.end());
return true;
}
RawMemoryUsage.FlushCounters();
BlobMemoryUsage.FlushCounters();
return false;
}

}
Loading

0 comments on commit 3f0791d

Please sign in to comment.