Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cleanup volume limits and speed up versions index copy #12249

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TCommittedData*>&& da
auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data));
Y_ABORT_UNLESS(indexChanges);

auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterIndexing);

Expand Down Expand Up @@ -875,7 +875,7 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
compaction->SetQueueGuard(guard);
compaction->Start(*this);

auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
auto request = compaction->ExtractDataAccessorsRequest();
const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) +
indexChanges->CalcMemoryForUsage();
Expand Down Expand Up @@ -973,7 +973,7 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
return false;
}

auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
for (auto&& i : indexChanges) {
i->Start(*this);
auto request = i->ExtractDataAccessorsRequest();
Expand Down Expand Up @@ -1033,7 +1033,7 @@ void TColumnShard::SetupCleanupPortions() {
changes->Start(*this);

auto request = changes->ExtractDataAccessorsRequest();
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema());
const auto subscriber = std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId(), changes, actualIndexInfo);

Expand Down Expand Up @@ -1064,7 +1064,7 @@ void TColumnShard::SetupCleanupTables() {
}

ACFL_DEBUG("background", "cleanup")("changes_info", changes->DebugString());
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, changes, false);
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ class IColumnEngine {

virtual std::vector<TCSMetadataRequest> CollectMetadataRequests() const = 0;
virtual const TVersionedIndex& GetVersionedIndex() const = 0;
virtual const std::shared_ptr<TVersionedIndex>& GetVersionedIndexReadonlyCopy() = 0;
virtual std::shared_ptr<TVersionedIndex> CopyVersionedIndexPtr() const = 0;
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const;

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
ui32 skipLocked = 0;
ui32 portionsFromDrop = 0;
bool limitExceeded = false;
const ui32 maxChunksCount = 100000;
const ui32 maxChunksCount = 500000;
const ui32 maxPortionsCount = 1000;
for (ui64 pathId : pathsToDrop) {
auto g = GranulesStorage->GetGranuleOptional(pathId);
Expand Down Expand Up @@ -401,7 +401,8 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
}
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())(
"portions_prepared", changes->GetPortionsToDrop().size())("drop", portionsFromDrop)("skip", skipLocked);
"portions_prepared", changes->GetPortionsToDrop().size())("drop", portionsFromDrop)("skip", skipLocked)("portions_counter", portionsCount)(
"chunks", chunksCount)("limit", limitExceeded)("max_portions", maxPortionsCount)("max_chunks", maxChunksCount);

if (changes->GetPortionsToDrop().empty()) {
return nullptr;
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ class TColumnEngineForLogs: public IColumnEngine {

std::shared_ptr<NActualizer::TController> ActualizationController;
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
TVersionedIndex VersionedIndex;
std::shared_ptr<TVersionedIndex> VersionedIndexCopy;

public:
virtual const std::shared_ptr<TVersionedIndex>& GetVersionedIndexReadonlyCopy() override {
if (!VersionedIndexCopy || !VersionedIndexCopy->IsEqualTo(VersionedIndex)) {
VersionedIndexCopy = std::make_shared<TVersionedIndex>(VersionedIndex);
}
return VersionedIndexCopy;
}

const std::shared_ptr<NActualizer::TController>& GetActualizationController() const {
return ActualizationController;
}
Expand Down Expand Up @@ -224,7 +233,6 @@ class TColumnEngineForLogs: public IColumnEngine {
void AppendPortion(const TPortionDataAccessor& portionInfo, const bool addAsAccessor = true);

private:
TVersionedIndex VersionedIndex;
ui64 TabletId;
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
std::map<TInstant, std::vector<TPortionInfo::TConstPtr>> CleanupPortions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ui64 TPortionInfo::GetMetadataMemorySize() const {
}

ui64 TPortionInfo::GetApproxChunksCount(const ui32 schemaColumnsCount) const {
return schemaColumnsCount * 256 * (GetRecordsCount() / 10000 + 1);
return schemaColumnsCount * (GetRecordsCount() / 10000 + 1);
}

void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class TVersionedIndex {
ISnapshotSchema::TPtr SchemeForActualization;

public:
bool IsEqualTo(const TVersionedIndex& vIndex) {
return LastSchemaVersion == vIndex.LastSchemaVersion && SnapshotByVersion.size() == vIndex.SnapshotByVersion.size() &&
ShardingInfo.size() == vIndex.ShardingInfo.size() && SchemeVersionForActualization == vIndex.SchemeVersionForActualization;
}

ISnapshotSchema::TPtr GetLastCriticalSchema() const {
return SchemeForActualization;
}
Expand Down
Loading