From 37aa5ec8db81acf74eb0ba2fef61e6c838d33660 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 13:51:56 +0300 Subject: [PATCH 01/13] speed up source usage detection --- .../reader/plain_reader/iterator/context.cpp | 18 ++++++------------ .../reader/plain_reader/iterator/source.h | 3 +++ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index 53510f02f2b9..756262153600 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -32,22 +32,16 @@ ui64 TSpecialReadContext::GetMemoryForSources(const THashMap TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr& source) { const bool needSnapshots = !source->GetExclusiveIntervalOnly() || ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax() || !source->IsSourceInMemory(); - bool partialUsageByPK = false; - { - const TPKRangeFilter::EUsageClass usage = - ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey()); - switch (usage) { + const bool partialUsageByPK = [&]() { + switch (source->GetPKUsageClass()) { case TPKRangeFilter::EUsageClass::PartialUsage: - partialUsageByPK = true; - break; + return true; case TPKRangeFilter::EUsageClass::DontUsage: - partialUsageByPK = true; - break; + return true; case TPKRangeFilter::EUsageClass::FullUsage: - partialUsageByPK = false; - break; + return false; } - } + }(); const bool useIndexes = (IndexChecker ? source->HasIndexes(IndexChecker->GetIndexIds()) : false); const bool isWholeExclusiveSource = source->GetExclusiveIntervalOnly() && source->IsSourceInMemory(); const bool hasDeletions = source->GetHasDeletions(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 82147598c216..8121c73a6296 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -48,6 +48,7 @@ class IDataSource { std::vector> ResourceGuards; std::optional FirstIntervalId; ui32 CurrentPlanStepIndex = 0; + YDB_READONLY(TPKRangeFilter::EUsageClass, UsageClass, TPKRangeFilter::EUsageClass::PartialUsage); protected: bool IsSourceInMemoryFlag = true; @@ -245,6 +246,8 @@ class IDataSource { , RecordsCount(recordsCount) , ShardingVersionOptional(shardingVersion) , HasDeletions(hasDeletions) { + UsageClass = Context->GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(GetStartReplaceKey(), GetFinishReplaceKey()); + AFL_VERIFY(UsageClass != TPKRangeFilter::EUsageClass::DontUsage); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson()); if (Start.IsReverseSort()) { std::swap(Start, Finish); From 034d9d3af2a63718fa9a456bc8250220358c0cdf Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 14:16:12 +0300 Subject: [PATCH 02/13] speed up --- .../tx/columnshard/engines/portions/portion_info.cpp | 12 ++++++------ .../tx/columnshard/engines/portions/portion_info.h | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 1884b98c862b..f40e356a099f 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -36,21 +36,21 @@ ui64 TPortionInfo::GetColumnRawBytes(const std::vector& columnIds, const b return GetColumnRawBytes(std::set(columnIds.begin(), columnIds.end()), validation); } -ui64 TPortionInfo::GetColumnRawBytes(const std::optional>& entityIds, const bool validation) const { +ui64 TPortionInfo::GetColumnRawBytes(const std::set& entityIds, const bool validation) const { ui64 sum = 0; const auto aggr = [&](const TColumnRecord& r) { sum += r.GetMeta().GetRawBytes(); }; - AggregateIndexChunksData(aggr, Records, entityIds, validation); + AggregateIndexChunksData(aggr, Records, &entityIds, validation); return sum; } -ui64 TPortionInfo::GetColumnBlobBytes(const std::optional>& entityIds, const bool validation) const { +ui64 TPortionInfo::GetColumnBlobBytes(const std::set& entityIds, const bool validation) const { ui64 sum = 0; const auto aggr = [&](const TColumnRecord& r) { sum += r.GetBlobRange().GetSize(); }; - AggregateIndexChunksData(aggr, Records, entityIds, validation); + AggregateIndexChunksData(aggr, Records, &entityIds, validation); return sum; } @@ -58,12 +58,12 @@ ui64 TPortionInfo::GetColumnBlobBytes(const std::vector& columnIds, const return GetColumnBlobBytes(std::set(columnIds.begin(), columnIds.end()), validation); } -ui64 TPortionInfo::GetIndexRawBytes(const std::optional>& entityIds, const bool validation) const { +ui64 TPortionInfo::GetIndexRawBytes(const std::set& entityIds, const bool validation) const { ui64 sum = 0; const auto aggr = [&](const TIndexChunk& r) { sum += r.GetRawBytes(); }; - AggregateIndexChunksData(aggr, Indexes, entityIds, validation); + AggregateIndexChunksData(aggr, Indexes, &entityIds, validation); return sum; } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index b3d51b961186..9dd32e36a884 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -98,7 +98,7 @@ class TPortionInfo { } template - static void AggregateIndexChunksData(const TAggregator& aggr, const std::vector& chunks, const std::optional>& columnIds, const bool validation) { + static void AggregateIndexChunksData(const TAggregator& aggr, const std::vector& chunks, const std::set* columnIds, const bool validation) { if (columnIds) { auto itColumn = columnIds->begin(); auto itRecord = chunks.begin(); @@ -547,7 +547,7 @@ class TPortionInfo { } ui64 GetColumnRawBytes(const std::vector& columnIds, const bool validation = true) const; - ui64 GetColumnRawBytes(const std::optional>& columnIds = {}, const bool validation = true) const; + ui64 GetColumnRawBytes(const std::set& columnIds, const bool validation = true) const; ui64 GetColumnBlobBytes(const std::vector& columnIds, const bool validation = true) const; ui64 GetColumnBlobBytes(const std::optional>& columnIds = {}, const bool validation = true) const; From 90b12ecf164d5b956ac65422e0befc2e013b95fa Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 14:43:46 +0300 Subject: [PATCH 03/13] speed up --- .../reader/plain_reader/iterator/scanner.cpp | 72 ++++++++++--------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index 9062602df80f..a845caf684b2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -133,6 +133,7 @@ class TSourcesStorageForMemoryOptimization { private: class TSourceInfo { private: + YDB_READONLY(ui64, Memory, 0); YDB_READONLY_DEF(std::shared_ptr, Source); YDB_READONLY_DEF(std::shared_ptr, FetchingInfo); @@ -140,19 +141,33 @@ class TSourcesStorageForMemoryOptimization { TSourceInfo(const std::shared_ptr& source, const std::shared_ptr& fetchingInfo) : Source(source) , FetchingInfo(fetchingInfo) { + Memory = fetching->PredictRawBytes(Source); } NJson::TJsonValue DebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("source", Source->DebugJsonForMemory()); + result.InsertValue("memory", Memory); // result.InsertValue("fetching", Fetching->DebugJsonForMemory()); return result; } + + bool ReduceMemory() { + const bool result = FetchingInfo->InitSourceSeqColumnIds(Source); + if (result) { + Memory = fetching->PredictRawBytes(source); + } + return result; + } + + bool operator<(const TSourceInfo& item) const { + return Memory < item.Memory; + } + }; - std::map> Sources; + std::vector Sources; YDB_READONLY(ui64, MemorySum, 0); - YDB_READONLY_DEF(std::set, PathIds); public: TString DebugString() const { @@ -170,47 +185,36 @@ class TSourcesStorageForMemoryOptimization { return resultJson.GetStringRobust(); } - void UpdateSource(const ui64 oldMemoryInfo, const ui32 sourceIdx) { - auto it = Sources.find(oldMemoryInfo); - AFL_VERIFY(it != Sources.end()); - auto itSource = it->second.find(sourceIdx); - AFL_VERIFY(itSource != it->second.end()); - auto sourceInfo = itSource->second; - it->second.erase(itSource); - if (it->second.empty()) { - Sources.erase(it); - } - AFL_VERIFY(MemorySum >= oldMemoryInfo); - MemorySum -= oldMemoryInfo; - AddSource(sourceInfo.GetSource(), sourceInfo.GetFetchingInfo()); - } - void AddSource(const std::shared_ptr& source, const std::shared_ptr& fetching) { const ui64 sourceMemory = fetching->PredictRawBytes(source); MemorySum += sourceMemory; - AFL_VERIFY(Sources[sourceMemory].emplace(source->GetSourceIdx(), TSourceInfo(source, fetching)).second); - PathIds.emplace(source->GetPathId()); + Sources.emplace_back(TSourceInfo(sourceMemory, source, fetching)); } bool Optimize(const ui64 memoryLimit) { - bool modified = true; - while (MemorySum > memoryLimit && modified) { - modified = false; - for (auto it = Sources.rbegin(); it != Sources.rend(); ++it) { - for (auto&& [sourceIdx, sourceInfo] : it->second) { - if (!sourceInfo.GetFetchingInfo()->InitSourceSeqColumnIds(sourceInfo.GetSource())) { - continue; - } - modified = true; - UpdateSource(it->first, sourceIdx); - break; - } - if (modified) { - break; + if (MemorySum <= memoryLimit) { + return true; + } + std::sort(Sources.begin(), Sources.end()); + while (true) { + std::vector nextSources; + while (memoryLimit < MemorySum && Sources.size()) { + const ui64 currentMemory = Sources.back().GetMemory(); + if (Sources.back().ReduceMemory()) { + AFL_VERIFY(currentMemory <= MemorySum); + MemorySum -= currentMemory; + MemorySum += Sources.back().GetMemory(); + nextSources.emplace_back(std::move(Sources.back())); } + Sources.pop_back(); + } + if (nextSources.empty() || MemorySum <= memoryLimit) { + break; } + std::sort(nextSources.begin(), nextSources.end()); + std::swap(nextSources, Sources); } - return MemorySum < memoryLimit; + return MemorySum <= memoryLimit; } }; From a4a2d061c33dc128fc41dba48dae826b2d3caaf2 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 14:44:07 +0300 Subject: [PATCH 04/13] fixes --- .../engines/portions/portion_info.cpp | 22 ++++++++++++++----- .../engines/portions/portion_info.h | 9 ++++---- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index f40e356a099f..0b3bc4703aad 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -32,10 +32,6 @@ std::shared_ptr TPortionInfo::MaxValue(ui32 columnId) const { return result; } -ui64 TPortionInfo::GetColumnRawBytes(const std::vector& columnIds, const bool validation) const { - return GetColumnRawBytes(std::set(columnIds.begin(), columnIds.end()), validation); -} - ui64 TPortionInfo::GetColumnRawBytes(const std::set& entityIds, const bool validation) const { ui64 sum = 0; const auto aggr = [&](const TColumnRecord& r) { @@ -54,8 +50,22 @@ ui64 TPortionInfo::GetColumnBlobBytes(const std::set& entityIds, const boo return sum; } -ui64 TPortionInfo::GetColumnBlobBytes(const std::vector& columnIds, const bool validation) const { - return GetColumnBlobBytes(std::set(columnIds.begin(), columnIds.end()), validation); +ui64 TPortionInfo::GetColumnRawBytes(const bool validation) const { + ui64 sum = 0; + const auto aggr = [&](const TColumnRecord& r) { + sum += r.GetMeta().GetRawBytes(); + }; + AggregateIndexChunksData(aggr, Records, nullptr, validation); + return sum; +} + +ui64 TPortionInfo::GetColumnBlobBytes(const bool validation) const { + ui64 sum = 0; + const auto aggr = [&](const TColumnRecord& r) { + sum += r.GetBlobRange().GetSize(); + }; + AggregateIndexChunksData(aggr, Records, nullptr, validation); + return sum; } ui64 TPortionInfo::GetIndexRawBytes(const std::set& entityIds, const bool validation) const { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 9dd32e36a884..c29013dd6a7e 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -537,7 +537,8 @@ class TPortionInfo { return result; } - ui64 GetIndexRawBytes(const std::optional>& columnIds = {}, const bool validation = true) const; + ui64 GetIndexRawBytes(const std::set& columnIds, const bool validation = true) const; + ui64 GetIndexRawBytes(const bool validation = true) const; ui64 GetIndexBlobBytes() const noexcept { ui64 sum = 0; for (const auto& rec : Indexes) { @@ -546,11 +547,11 @@ class TPortionInfo { return sum; } - ui64 GetColumnRawBytes(const std::vector& columnIds, const bool validation = true) const; ui64 GetColumnRawBytes(const std::set& columnIds, const bool validation = true) const; + ui64 GetColumnRawBytes(const bool validation = true) const; - ui64 GetColumnBlobBytes(const std::vector& columnIds, const bool validation = true) const; - ui64 GetColumnBlobBytes(const std::optional>& columnIds = {}, const bool validation = true) const; + ui64 GetColumnBlobBytes(const std::set& columnIds, const bool validation = true) const; + ui64 GetColumnBlobBytes(const bool validation = true) const; ui64 GetTotalBlobBytes() const noexcept { return GetIndexBlobBytes() + GetColumnBlobBytes(); From 100f577f253ef419d30e97a0f6c4959f55b53d86 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 14:44:27 +0300 Subject: [PATCH 05/13] fix build --- .../engines/reader/plain_reader/iterator/context.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index 756262153600..0efd8bfbb9d2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -33,7 +33,7 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con const bool needSnapshots = !source->GetExclusiveIntervalOnly() || ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax() || !source->IsSourceInMemory(); const bool partialUsageByPK = [&]() { - switch (source->GetPKUsageClass()) { + switch (source->GetUsageClass()) { case TPKRangeFilter::EUsageClass::PartialUsage: return true; case TPKRangeFilter::EUsageClass::DontUsage: From 679e2d4354363ad659ad340a0d6b2e34a0d9e775 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 14:46:26 +0300 Subject: [PATCH 06/13] fix --- .../engines/reader/plain_reader/iterator/scanner.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index a845caf684b2..177332f7671f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -141,21 +141,21 @@ class TSourcesStorageForMemoryOptimization { TSourceInfo(const std::shared_ptr& source, const std::shared_ptr& fetchingInfo) : Source(source) , FetchingInfo(fetchingInfo) { - Memory = fetching->PredictRawBytes(Source); + Memory = FetchingInfo->PredictRawBytes(Source); } NJson::TJsonValue DebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("source", Source->DebugJsonForMemory()); result.InsertValue("memory", Memory); - // result.InsertValue("fetching", Fetching->DebugJsonForMemory()); + // result.InsertValue("FetchingInfo", FetchingInfo->DebugJsonForMemory()); return result; } bool ReduceMemory() { const bool result = FetchingInfo->InitSourceSeqColumnIds(Source); if (result) { - Memory = fetching->PredictRawBytes(source); + Memory = FetchingInfo->PredictRawBytes(source); } return result; } @@ -186,9 +186,8 @@ class TSourcesStorageForMemoryOptimization { } void AddSource(const std::shared_ptr& source, const std::shared_ptr& fetching) { - const ui64 sourceMemory = fetching->PredictRawBytes(source); - MemorySum += sourceMemory; - Sources.emplace_back(TSourceInfo(sourceMemory, source, fetching)); + Sources.emplace_back(TSourceInfo(source, fetching)); + MemorySum += Sources.back().GetMemory(); } bool Optimize(const ui64 memoryLimit) { From a8aed02b0b43bfcff6b2aaf91a2c46b1c21687d2 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 14:53:22 +0300 Subject: [PATCH 07/13] correction --- .../plain_reader/constructor/read_metadata.h | 4 ++++ .../reader/plain_reader/iterator/scanner.cpp | 15 ++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h index 1e68dd77d789..cbd397bf366e 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h @@ -106,6 +106,10 @@ struct TReadMetadata : public TReadMetadataBase { return GetProgram().HasProcessingColumnIds(); } + ui64 GetPathId() const { + return PathId; + } + std::shared_ptr SelectInfo; NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; std::vector CommittedBlobs; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index 177332f7671f..87de386beda9 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -155,7 +155,7 @@ class TSourcesStorageForMemoryOptimization { bool ReduceMemory() { const bool result = FetchingInfo->InitSourceSeqColumnIds(Source); if (result) { - Memory = FetchingInfo->PredictRawBytes(source); + Memory = FetchingInfo->PredictRawBytes(Source); } return result; } @@ -174,13 +174,10 @@ class TSourcesStorageForMemoryOptimization { NJson::TJsonValue resultJson; auto& memorySourcesArr = resultJson.InsertValue("sources_by_memory", NJson::JSON_ARRAY); resultJson.InsertValue("sources_by_memory_count", Sources.size()); - for (auto it = Sources.rbegin(); it != Sources.rend(); ++it) { + for (auto&& it: Sources) { auto& sourceMap = memorySourcesArr.AppendValue(NJson::JSON_MAP); - sourceMap.InsertValue("memory", it->first); auto& sourcesArr = sourceMap.InsertValue("sources", NJson::JSON_ARRAY); - for (auto&& s : it->second) { - sourcesArr.AppendValue(s.second.DebugJson()); - } + sourcesArr.AppendValue(it.DebugJson()); } return resultJson.GetStringRobust(); } @@ -231,16 +228,16 @@ TConclusionStatus TScanHead::DetectSourcesFeatureInContextIntervalScan( if (!optimizer.Optimize(Context->ReduceMemoryIntervalLimit) && Context->RejectMemoryIntervalLimit < optimizer.GetMemorySum()) { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "next_internal_broken")("reason", "a lot of memory need")("start", startMemory)( "reduce_limit", Context->ReduceMemoryIntervalLimit)("reject_limit", Context->RejectMemoryIntervalLimit)( - "need", optimizer.GetMemorySum())("path_ids", JoinSeq(",", optimizer.GetPathIds()))( + "need", optimizer.GetMemorySum())("path_id", Context->GetReadMetadata()->GetPathId())( "details", IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD_SCAN) ? optimizer.DebugString() : "NEED_DEBUG_LEVEL"); Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryFailed(optimizer.GetMemorySum()); return TConclusionStatus::Fail("We need a lot of memory in time for interval scanner: " + ::ToString(optimizer.GetMemorySum()) + - " path_ids: " + JoinSeq(",", optimizer.GetPathIds()) + ". We need wait compaction processing. Sorry."); + " path_id: " + Context->GetReadMetadata()->GetPathId() + ". We need wait compaction processing. Sorry."); } else if (optimizer.GetMemorySum() < startMemory) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "memory_reduce_active")("reason", "need reduce memory")("start", startMemory)( "reduce_limit", Context->ReduceMemoryIntervalLimit)("reject_limit", Context->RejectMemoryIntervalLimit)( - "need", optimizer.GetMemorySum())("path_ids", JoinSeq(",", optimizer.GetPathIds())); + "need", optimizer.GetMemorySum())("path_id", Context->GetReadMetadata()->GetPathId()); Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryReduced(startMemory - optimizer.GetMemorySum()); } Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryRequired(optimizer.GetMemorySum()); From b9788b21339effecefe564e907799fef32006b36 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 14:55:14 +0300 Subject: [PATCH 08/13] fix build --- .../tx/columnshard/engines/portions/portion_info.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 0b3bc4703aad..0605d5ffcbab 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -77,6 +77,15 @@ ui64 TPortionInfo::GetIndexRawBytes(const std::set& entityIds, const bool return sum; } +ui64 TPortionInfo::GetIndexRawBytes(const bool validation) const { + ui64 sum = 0; + const auto aggr = [&](const TIndexChunk& r) { + sum += r.GetRawBytes(); + }; + AggregateIndexChunksData(aggr, Indexes, nullptr, validation); + return sum; +} + TString TPortionInfo::DebugString(const bool withDetails) const { TStringBuilder sb; sb << "(portion_id:" << Portion << ";" << From 9fe1a7b77ac2fa0f16b7889b60cfa30a4129ba02 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 15:18:06 +0300 Subject: [PATCH 09/13] skip not used insertions --- .../engines/insert_table/committed.h | 23 ++++++-------- .../engines/insert_table/insert_table.cpp | 31 +++++++++++++++---- .../engines/insert_table/insert_table.h | 7 +++-- .../engines/insert_table/rt_insertion.cpp | 4 ++- .../engines/reader/abstract/read_metadata.cpp | 3 +- .../plain_reader/iterator/plain_read_data.cpp | 6 ++-- .../reader/plain_reader/iterator/source.h | 5 ++- 7 files changed, 49 insertions(+), 30 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/insert_table/committed.h b/ydb/core/tx/columnshard/engines/insert_table/committed.h index ed2ffdadb3ca..bd633647b5ec 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/committed.h +++ b/ydb/core/tx/columnshard/engines/insert_table/committed.h @@ -56,27 +56,24 @@ class TCommittedBlob { YDB_READONLY(ui64, SchemaVersion, 0); YDB_READONLY(ui64, RecordsCount, 0); YDB_READONLY(bool, IsDelete, false); - YDB_READONLY_DEF(std::optional, First); - YDB_READONLY_DEF(std::optional, Last); + NArrow::TReplaceKey First; + NArrow::TReplaceKey Last; YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset); public: - ui64 GetSize() const { - return BlobRange.Size; + const NArrow::TReplaceKey& GetFirst() const { + return First; } - - const NArrow::TReplaceKey& GetFirstVerified() const { - Y_ABORT_UNLESS(First); - return *First; + const NArrow::TReplaceKey& GetLast() const { + return Last; } - const NArrow::TReplaceKey& GetLastVerified() const { - Y_ABORT_UNLESS(Last); - return *Last; + ui64 GetSize() const { + return BlobRange.Size; } TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount, - const std::optional& first, const std::optional& last, const bool isDelete, + const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete, const NArrow::TSchemaSubset& subset) : BlobRange(blobRange) , WriteInfo(snapshot) @@ -89,7 +86,7 @@ class TCommittedBlob { } TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount, - const std::optional& first, const std::optional& last, const bool isDelete, + const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete, const NArrow::TSchemaSubset& subset) : BlobRange(blobRange) , WriteInfo(writeId) diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index a948c0077077..18e69a2b0f55 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -13,6 +13,7 @@ bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { dbTable.Insert(*dataPtr); return true; } else { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_insertion"); return false; } } @@ -39,11 +40,15 @@ TInsertionSummary::TCounters TInsertTable::Commit( auto* pathInfo = Summary.GetPathInfoOptional(pathId); // There could be commit after drop: propose, drop, plan if (pathInfo && pathExists(pathId)) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", data->GetPathId())( + "blob_range", data->GetBlobRange().ToString()); auto committed = data->Commit(planStep, txId); dbTable.Commit(committed); pathInfo->AddCommitted(std::move(committed)); } else { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())( + "blob_range", data->GetBlobRange().ToString()); dbTable.Abort(*data); Summary.AddAborted(std::move(*data)); } @@ -58,6 +63,8 @@ void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet& wr for (auto writeId : writeIds) { // There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId. if (std::optional data = Summary.ExtractInserted(writeId)) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())( + "blob_range", data->GetBlobRange().ToString())("write_id", writeId); dbTable.EraseInserted(*data); dbTable.Abort(*data); Summary.AddAborted(std::move(*data)); @@ -108,8 +115,8 @@ bool TInsertTable::Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant return dbTable.Load(*this, loadTime); } -std::vector TInsertTable::Read( - ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, const std::shared_ptr& pkSchema) const { +std::vector TInsertTable::Read(ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, + const std::shared_ptr& pkSchema, const TPKRangesFilter* pkRangesFilter) const { const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId); if (!pInfo) { return {}; @@ -120,15 +127,27 @@ std::vector TInsertTable::Read( for (const auto& data : pInfo->GetCommitted()) { if (lockId || data.GetSnapshot() <= reqSnapshot) { + auto start = data.GetMeta().GetFirstPK(pkSchema); + auto finish = data.GetMeta().GetLastPK(pkSchema); + AFL_VERIFY(start); + AFL_VERIFY(finish); + if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(*start, *finish) == TPKRangeFilter::EUsageClass::DontUsage) { + continue; + } result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), - data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema), - data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); + *start, *finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } } if (lockId) { for (const auto& [writeId, data] : pInfo->GetInserted()) { - result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), - data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema), + auto start = data.GetMeta().GetFirstPK(pkSchema); + auto finish = data.GetMeta().GetLastPK(pkSchema); + AFL_VERIFY(start); + AFL_VERIFY(finish); + if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(*start, *finish) == TPKRangeFilter::EUsageClass::DontUsage) { + continue; + } + result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), *start, *finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index 324cbbf87946..b44e64312191 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -9,7 +9,7 @@ #include namespace NKikimr::NOlap { - +class TPKRangesFilter; class IDbWrapper; /// Use one table for inserted and committed blobs: @@ -57,6 +57,7 @@ class TInsertTableAccessor { return Summary.AddInserted(std::move(data), load); } bool AddAborted(TInsertedData&& data, const bool load) { + AFL_VERIFY_DEBUG(!Summary.ExtractInserted(data.GetInsertWriteId())); if (load) { AddBlobLink(data.GetBlobRange().BlobId); } @@ -114,8 +115,8 @@ class TInsertTable: public TInsertTableAccessor { void EraseAbortedOnExecute(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr& blobsAction); void EraseAbortedOnComplete(const TInsertedData& key); - std::vector Read( - ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, const std::shared_ptr& pkSchema) const; + std::vector Read(ui64 pathId, const std::optional lockId, const TSnapshot& reqSnapshot, + const std::shared_ptr& pkSchema, const TPKRangesFilter* pkRangesFilter) const; bool Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant loadTime); TInsertWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc); diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index 8012211c418e..3ad39fcd209e 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -166,8 +166,9 @@ bool TInsertionSummary::HasCommitted(const TCommittedData& data) { const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) { const TInsertWriteId writeId = data.GetInsertWriteId(); Counters.Aborted.Add(data.BlobSize(), load); + AFL_VERIFY_DEBUG(!Inserted.contains(writeId)); auto insertInfo = Aborted.emplace(writeId, std::move(data)); - Y_ABORT_UNLESS(insertInfo.second); + AFL_VERIFY(insertInfo.second)("write_id", writeId); return &insertInfo.first->second; } @@ -191,6 +192,7 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedDat const ui32 dataSize = data.BlobSize(); const ui64 pathId = data.GetPathId(); auto insertInfo = Inserted.emplace(writeId, std::move(data)); + AFL_VERIFY_DEBUG(!Aborted.contains(writeId)); if (insertInfo.second) { OnNewInserted(GetPathInfo(pathId), dataSize, load); return &insertInfo.first->second; diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp index dd01511460c2..88416a4d214f 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp @@ -25,7 +25,8 @@ ISnapshotSchema::TPtr TReadMetadataBase::GetLoadSchemaVerified(const TPortionInf std::vector TDataStorageAccessor::GetCommitedBlobs(const TReadDescription& readDescription, const std::shared_ptr& pkSchema, const std::optional lockId, const TSnapshot& reqSnapshot) const { - return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema)); + AFL_VERIFY(readDescription.PKRangesFilter); + return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema, &*readDescription.PKRangesFilter)); } } // namespace NKikimr::NOlap::NReader diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp index ec228c363b90..04ed0d1c6f26 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp @@ -28,8 +28,8 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) if (GetReadMetadata()->IsMyUncommitted(i.GetWriteIdVerified())) { continue; } - if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirstVerified()) || - GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLastVerified())) { + if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) || + GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) { GetReadMetadata()->SetConflictedWriteId(i.GetWriteIdVerified()); } } @@ -39,7 +39,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& context) if (GetReadMetadata()->IsWriteConflictable(i.GetWriteIdVerified())) { continue; } - } else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirstVerified(), i.GetLastVerified()) == + } else if (GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(i.GetFirst(), i.GetLast()) == TPKRangeFilter::EUsageClass::DontUsage) { continue; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 8121c73a6296..879c42d142a6 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -456,9 +456,8 @@ class TCommittedDataSource: public IDataSource { } TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr& context) - : TBase(sourceIdx, context, committed.GetFirstVerified(), committed.GetLastVerified(), committed.GetSnapshotDef(TSnapshot::Zero()), - committed.GetSnapshotDef(TSnapshot::Zero()), - committed.GetRecordsCount(), {}, committed.GetIsDelete()) + : TBase(sourceIdx, context, committed.GetFirst(), committed.GetLast(), committed.GetSnapshotDef(TSnapshot::Zero()), + committed.GetSnapshotDef(TSnapshot::Zero()), committed.GetRecordsCount(), {}, committed.GetIsDelete()) , CommittedBlob(committed) { } }; From 0adff7a81b442af896fb6fb5907958b3f17f58c4 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 15:39:37 +0300 Subject: [PATCH 10/13] use fingerprinted data for cache --- .../engines/reader/plain_reader/iterator/source.h | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 879c42d142a6..889f9fe5e7d4 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -266,6 +266,7 @@ class TPortionDataSource: public IDataSource { std::set SequentialEntityIds; std::shared_ptr Portion; std::shared_ptr Schema; + mutable THashMap FingerprintedData; void NeedFetchColumns(const std::set& columnIds, TBlobsAction& blobsAction, THashMap& nullBlocks, const std::shared_ptr& filter); @@ -307,6 +308,7 @@ class TPortionDataSource: public IDataSource { return Portion->GetPathId(); } virtual bool DoAddSequentialEntityIds(const ui32 entityId) override { + FingerprintedData.clear(); return SequentialEntityIds.emplace(entityId).second; } @@ -334,6 +336,13 @@ class TPortionDataSource: public IDataSource { } virtual ui64 GetColumnRawBytes(const std::set& columnsIds) const override { + AFL_VERIFY(columnsIds.size()); + const ui64 fp = CombineHashes(*columnsIds.begin(), *columnsIds.rbegin()); + auto it = FingerprintedData.find(fp); + if (it != FingerprintedData.end()) { + return it->second; + } + ui64 result = 0; if (SequentialEntityIds.size()) { std::set selectedSeq; std::set selectedInMem; @@ -344,11 +353,13 @@ class TPortionDataSource: public IDataSource { selectedInMem.emplace(i); } } - return Portion->GetMinMemoryForReadColumns(selectedSeq) + Portion->GetColumnBlobBytes(selectedSeq) + + result = Portion->GetMinMemoryForReadColumns(selectedSeq) + Portion->GetColumnBlobBytes(selectedSeq) + Portion->GetColumnRawBytes(selectedInMem, false); } else { - return Portion->GetColumnRawBytes(columnsIds, false); + result = Portion->GetColumnRawBytes(columnsIds, false); } + FingerprintedData.emplace(fp, result); + return result; } virtual ui64 GetColumnBlobBytes(const std::set& columnsIds) const override { From 015a385babf89dddb209036f14fdec2d818b0edb Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 17:24:13 +0300 Subject: [PATCH 11/13] fix build test --- .../tx/columnshard/engines/ut/ut_insert_table.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp index 240d33ab7f2c..1fa189536128 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp @@ -99,9 +99,9 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { UNIT_ASSERT(!ok); // read nothing - auto blobs = insertTable.Read(tableId, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema()); + auto blobs = insertTable.Read(tableId, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); - blobs = insertTable.Read(tableId + 1, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema()); + blobs = insertTable.Read(tableId + 1, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); // commit @@ -115,15 +115,15 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { // UNIT_ASSERT_EQUAL((*insertTable.GetPathPriorities().begin()->second.begin())->GetCommitted().size(), 1); // read old snapshot - blobs = insertTable.Read(tableId, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema()); + blobs = insertTable.Read(tableId, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); - blobs = insertTable.Read(tableId + 1, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema()); + blobs = insertTable.Read(tableId + 1, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); // read new snapshot - blobs = insertTable.Read(tableId, {}, TSnapshot(planStep, txId), TLocalHelper::GetMetaSchema()); + blobs = insertTable.Read(tableId, {}, TSnapshot(planStep, txId), TLocalHelper::GetMetaSchema(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 1); - blobs = insertTable.Read(tableId + 1, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema()); + blobs = insertTable.Read(tableId + 1, {}, TSnapshot::Zero(), TLocalHelper::GetMetaSchema(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); } } From 08ef708d759bcb6922fa3b0a5bc688f60bb061df Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 11 Sep 2024 19:36:20 +0300 Subject: [PATCH 12/13] fix --- .../columnshard/engines/predicate/range.cpp | 20 +------------------ 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp index b47f88b2e324..83c6602d242d 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp @@ -40,25 +40,7 @@ NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(const arrow::Datum& d } bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info) const { - if (const auto& from = PredicateFrom.GetReplaceKey()) { - const auto& portionEnd = info.IndexKeyEnd(); - const int commonSize = std::min(from->Size(), portionEnd.Size()); - if (std::is_gt(from->ComparePartNotNull(portionEnd, commonSize))) { - return false; - } - } - - if (const auto& to = PredicateTo.GetReplaceKey()) { - const auto& portionStart = info.IndexKeyStart(); - const int commonSize = std::min(to->Size(), portionStart.Size()); - if (std::is_lt(to->ComparePartNotNull(portionStart, commonSize))) { - return false; - } - } -// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", info.IndexKeyStart().DebugString())("end", info.IndexKeyEnd().DebugString())( -// "from", PredicateFrom.DebugString())("to", PredicateTo.DebugString()); - - return true; + return IsPortionInPartialUsage(info.IndexKeyStart(), info.IndexKeyEnd()) != TPKRangeFilter::EUsageClass::DontUsage; } TPKRangeFilter::EUsageClass TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end) const { From a342294a1cb3d28921846a33c2674f44a4d61650 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 12 Sep 2024 12:55:51 +0300 Subject: [PATCH 13/13] fix build --- .../engines/insert_table/insert_table.cpp | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 18e69a2b0f55..16cbe6ff616f 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -129,25 +129,21 @@ std::vector TInsertTable::Read(ui64 pathId, const std::optional< if (lockId || data.GetSnapshot() <= reqSnapshot) { auto start = data.GetMeta().GetFirstPK(pkSchema); auto finish = data.GetMeta().GetLastPK(pkSchema); - AFL_VERIFY(start); - AFL_VERIFY(finish); - if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(*start, *finish) == TPKRangeFilter::EUsageClass::DontUsage) { + if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) { continue; } result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(), - *start, *finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); + start, finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } } if (lockId) { for (const auto& [writeId, data] : pInfo->GetInserted()) { auto start = data.GetMeta().GetFirstPK(pkSchema); auto finish = data.GetMeta().GetLastPK(pkSchema); - AFL_VERIFY(start); - AFL_VERIFY(finish); - if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(*start, *finish) == TPKRangeFilter::EUsageClass::DontUsage) { + if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) { continue; } - result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), *start, *finish, + result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), start, finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset())); } }