From 0d5a97e4881e65e9a6cffa8e98f07f4619046780 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 3 Jan 2025 14:32:57 +0300 Subject: [PATCH] correction --- .../blobs_action/counters/storage.h | 1 + .../columnshard/columnshard__statistics.cpp | 107 ++++++++++++++---- .../engines/portions/data_accessor.cpp | 27 +++++ .../engines/portions/data_accessor.h | 16 +++ 4 files changed, 129 insertions(+), 22 deletions(-) diff --git a/ydb/core/tx/columnshard/blobs_action/counters/storage.h b/ydb/core/tx/columnshard/blobs_action/counters/storage.h index 617c952c84c5..1ba6135f82fa 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/storage.h +++ b/ydb/core/tx/columnshard/blobs_action/counters/storage.h @@ -26,6 +26,7 @@ enum class EConsumer { WRITING_BUFFER, WRITING_OPERATOR, NORMALIZER, + STATISTICS, COUNT }; diff --git a/ydb/core/tx/columnshard/columnshard__statistics.cpp b/ydb/core/tx/columnshard/columnshard__statistics.cpp index c482b73a7268..4bd312c765df 100644 --- a/ydb/core/tx/columnshard/columnshard__statistics.cpp +++ b/ydb/core/tx/columnshard/columnshard__statistics.cpp @@ -1,5 +1,6 @@ #include "columnshard.h" #include "columnshard_impl.h" + #include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h" #include @@ -7,7 +8,6 @@ #include - namespace NKikimr::NColumnShard { void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) { @@ -15,8 +15,6 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const // TODO Start a potentially long analysis process. // ... - - // Return the response when the analysis is completed auto response = std::make_unique(); auto& responseRecord = response->Record; @@ -64,8 +62,7 @@ class TResultAccumulator { std::unique_ptr&& response) : RequestSenderActorId(requestSenderActorId) , Cookie(cookie) - , Response(std::move(response)) - { + , Response(std::move(response)) { for (auto&& i : tags) { AFL_VERIFY(Calculated.emplace(i, nullptr).second); } @@ -104,11 +101,11 @@ class TResultAccumulator { OnResultReady(); } } - }; class TColumnPortionsAccumulator { private: + const std::shared_ptr StoragesManager; const std::set ColumnTagsRequested; std::vector Portions; const ui32 PortionsCountLimit = 10000; @@ -117,19 +114,66 @@ class TColumnPortionsAccumulator { const std::shared_ptr VersionedIndex; public: - TColumnPortionsAccumulator(const std::shared_ptr& result, const ui32 portionsCountLimit, + TColumnPortionsAccumulator(const std::shared_ptr& storagesManager, const std::shared_ptr& result, + const ui32 portionsCountLimit, const std::set& originalColumnTags, const std::shared_ptr& vIndex, const std::shared_ptr& dataAccessorsManager) - : ColumnTagsRequested(originalColumnTags) + : StoragesManager(storagesManager) + , ColumnTagsRequested(originalColumnTags) , PortionsCountLimit(portionsCountLimit) , DataAccessors(dataAccessorsManager) , Result(result) - , VersionedIndex(vIndex) - { + , VersionedIndex(vIndex) { } + class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask { + private: + using TBase = NOlap::NBlobOperations::NRead::ITask; + const std::shared_ptr Result; + THashMap>> RangesByColumn; + THashMap> SketchesByColumns; + + protected: + virtual void DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) override { + TCompositeReadBlobs blobs = ExtractBlobsData(); + for (auto&& [columnId, data] : RangesByColumn) { + for (auto&& [storageId, blobs] : data) { + for (auto&& b : blobs) { + const TString blob = blobs.Extract(storageId, b); + auto sketch = std::unique_ptr(TCountMinSketch::FromString(blob.data(), blob.size())); + *sketchesByColumns[columnId] += *sketch; + } + } + } + Result->AddResult(std::move(sketchesByColumns)); + } + + virtual bool DoOnError( + const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)( + "status", status.GetErrorMessage())("status_code", status.GetStatus()); + AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())( + "error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())( + "task_id", TxEvent->IndexChanges->GetTaskIdentifier())("debug", TxEvent->IndexChanges->DebugString()); + return false; + } + + public: + TIndexReadTask(const std::shared_ptr& result, std::vector>&& readingActions, + THashMap>>&& rangesByColumn, + THashMap>&& readySketches) + : TBase(std::move(readingActions), "STATISTICS", "STATISTICS") + , Result(result) + , RangesByColumn(std::move(RangesByColumn)) + , SketchesByColumns(std::move(readySketches)) { + AFL_VERIFY(!!Result); + AFL_VERIFY(RangesByColumn.size()); + } + }; + class TMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber { private: + const std::shared_ptr StoragesManager; const std::shared_ptr Result; std::shared_ptr VersionedIndex; const std::set ColumnTagsRequested; @@ -143,6 +187,8 @@ class TColumnPortionsAccumulator { sketchesByColumns.emplace(id, TCountMinSketch::Create()); } + THashMap>> rangesByColumn; + for (const auto& [id, portionInfo] : result.GetPortions()) { std::shared_ptr portionSchema = portionInfo.GetPortionInfo().GetSchema(*VersionedIndex); for (const ui32 columnId : ColumnTagsRequested) { @@ -154,16 +200,34 @@ class TColumnPortionsAccumulator { } AFL_VERIFY(indexMeta->GetColumnIds().size() == 1); - const std::vector data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId()); + if (!indexMeta->IsInplaceData()) { + portionInfo.FillBlobRangesByStorage(rangesByColumn, portionSchema->GetIndexInfo(), { columnId }); + } else { + const std::vector data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId()); - for (const auto& sketchAsString : data) { - auto sketch = - std::unique_ptr(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size())); - *sketchesByColumns[columnId] += *sketch; + for (const auto& sketchAsString : data) { + auto sketch = + std::unique_ptr(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size())); + *sketchesByColumns[columnId] += *sketch; + } } } } - Result->AddResult(std::move(sketchesByColumns)); + if (rangesByColumn.size()) { + TBlobsAction blobsAction(StoragesManager, NBlobOperations::EConsumer::STATISTICS); + for (auto&& i : rangesByColumn) { + for (auto&& [storageId, ranges]: i.second) { + auto reader = blobsAction.GetReading(storageId); + for (auto&& i : ranges) { + reader->AddRange(i); + } + } + } + TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor( + std::make_shared(Result, blobsAction.GetReadingActions(), rangesByColumn, sketchesByColumns))); + } else { + Result->AddResult(std::move(sketchesByColumns)); + } } public: @@ -171,9 +235,7 @@ class TColumnPortionsAccumulator { const std::shared_ptr& result, const std::shared_ptr& vIndex, const std::set& tags) : Result(result) , VersionedIndex(vIndex) - , ColumnTagsRequested(tags) - { - + , ColumnTagsRequested(tags) { } }; @@ -186,7 +248,7 @@ class TColumnPortionsAccumulator { for (auto&& i : Portions) { request->AddPortion(i); } - request->RegisterSubscriber(std::make_shared(Result, VersionedIndex, ColumnTagsRequested)); + request->RegisterSubscriber(std::make_shared(StoragesManager, Result, VersionedIndex, ColumnTagsRequested)); Portions.clear(); DataAccessors->AskData(request); } @@ -234,7 +296,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, std::shared_ptr resultAccumulator = std::make_shared(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response)); auto versionedIndex = std::make_shared(index.GetVersionedIndex()); - TColumnPortionsAccumulator portionsPack(resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified()); + TColumnPortionsAccumulator portionsPack(StoragesManager, + resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified()); for (const auto& [_, portionInfo] : spg->GetPortions()) { if (!portionInfo->IsVisible(GetMaxReadVersion())) { @@ -246,4 +309,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, resultAccumulator->Start(); } -} +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp index 61209dcee4de..3a09ccea9441 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp @@ -116,6 +116,33 @@ TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssembl return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent); } +void TPortionDataAccessor::FillBlobRangesByStorage(THashMap>>& result, + const TVersionedIndex& index, const THashSet& entityIds) const { + auto schema = PortionInfo->GetSchema(index); + return FillBlobRangesByStorage(result, schema->GetIndexInfo()); +} + +void TPortionDataAccessor::FillBlobRangesByStorage( + THashMap>>& result, const TIndexInfo& indexInfo, const THashSet& entityIds) const { + for (auto&& i : GetRecordsVerified()) { + if (!entityIds.contains(i.GetEntityId())) { + continue; + } + const TString& storageId = PortionInfo->GetColumnStorageId(i.GetColumnId(), indexInfo); + AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(i.GetBlobRange())).second)( + "blob_id", PortionInfo->RestoreBlobRange(i.GetBlobRange()).ToString()); + } + for (auto&& i : GetIndexesVerified()) { + if (!entityIds.contains(i.GetEntityId())) { + continue; + } + const TString& storageId = PortionInfo->GetIndexStorageId(i.GetIndexId(), indexInfo); + auto bRange = i.GetBlobRangeVerified(); + AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(bRange)).second)( + "blob_id", PortionInfo->RestoreBlobRange(bRange).ToString()); + } +} + void TPortionDataAccessor::FillBlobRangesByStorage(THashMap>& result, const TVersionedIndex& index) const { auto schema = PortionInfo->GetSchema(index); return FillBlobRangesByStorage(result, schema->GetIndexInfo()); diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.h b/ydb/core/tx/columnshard/engines/portions/data_accessor.h index fadbf8f8cc30..975e6440fc42 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.h +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.h @@ -181,6 +181,19 @@ class TPortionDataAccessor { return result; } + THashMap> GetIndexRangesVerified(const ui32 indexId) const { + if (!Indexes) { + return {}; + } + THashMap> result; + for (auto&& i : *Indexes) { + if (i.GetEntityId() == indexId) { + result.emplace_back(i.GetBlobDataVerified()); + } + } + return result; + } + std::set GetColumnIds() const { std::set result; for (auto&& i : GetRecordsVerified()) { @@ -229,6 +242,9 @@ class TPortionDataAccessor { void FillBlobRangesByStorage(THashMap>& result, const TIndexInfo& indexInfo) const; void FillBlobRangesByStorage(THashMap>& result, const TVersionedIndex& index) const; + void FillBlobRangesByStorage(THashMap>>& result, const TIndexInfo& indexInfo, const THashSet& entityIds) const; + void FillBlobRangesByStorage( + THashMap>>& result, const TVersionedIndex& index, const THashSet& entityIds) const; void FillBlobIdsByStorage(THashMap>& result, const TIndexInfo& indexInfo) const; void FillBlobIdsByStorage(THashMap>& result, const TVersionedIndex& index) const;