diff --git a/ydb/core/tx/columnshard/blobs_action/counters/storage.h b/ydb/core/tx/columnshard/blobs_action/counters/storage.h index 617c952c84c5..1ba6135f82fa 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/storage.h +++ b/ydb/core/tx/columnshard/blobs_action/counters/storage.h @@ -26,6 +26,7 @@ enum class EConsumer { WRITING_BUFFER, WRITING_OPERATOR, NORMALIZER, + STATISTICS, COUNT }; diff --git a/ydb/core/tx/columnshard/columnshard__statistics.cpp b/ydb/core/tx/columnshard/columnshard__statistics.cpp index c482b73a7268..d9aabb65a905 100644 --- a/ydb/core/tx/columnshard/columnshard__statistics.cpp +++ b/ydb/core/tx/columnshard/columnshard__statistics.cpp @@ -1,13 +1,14 @@ #include "columnshard.h" #include "columnshard_impl.h" + #include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h" #include +#include #include #include - namespace NKikimr::NColumnShard { void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) { @@ -15,8 +16,6 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const // TODO Start a potentially long analysis process. // ... - - // Return the response when the analysis is completed auto response = std::make_unique(); auto& responseRecord = response->Record; @@ -64,8 +63,7 @@ class TResultAccumulator { std::unique_ptr&& response) : RequestSenderActorId(requestSenderActorId) , Cookie(cookie) - , Response(std::move(response)) - { + , Response(std::move(response)) { for (auto&& i : tags) { AFL_VERIFY(Calculated.emplace(i, nullptr).second); } @@ -104,11 +102,11 @@ class TResultAccumulator { OnResultReady(); } } - }; class TColumnPortionsAccumulator { private: + const std::shared_ptr StoragesManager; const std::set ColumnTagsRequested; std::vector Portions; const ui32 PortionsCountLimit = 10000; @@ -117,19 +115,68 @@ class TColumnPortionsAccumulator { const std::shared_ptr VersionedIndex; public: - TColumnPortionsAccumulator(const std::shared_ptr& result, const ui32 portionsCountLimit, - const std::set& originalColumnTags, const std::shared_ptr& vIndex, + TColumnPortionsAccumulator(const std::shared_ptr& storagesManager, + const std::shared_ptr& result, const ui32 portionsCountLimit, const std::set& originalColumnTags, + const std::shared_ptr& vIndex, const std::shared_ptr& dataAccessorsManager) - : ColumnTagsRequested(originalColumnTags) + : StoragesManager(storagesManager) + , ColumnTagsRequested(originalColumnTags) , PortionsCountLimit(portionsCountLimit) , DataAccessors(dataAccessorsManager) , Result(result) - , VersionedIndex(vIndex) - { + , VersionedIndex(vIndex) { } + class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask { + private: + using TBase = NOlap::NBlobOperations::NRead::ITask; + const std::shared_ptr Result; + THashMap>> RangesByColumn; + THashMap> SketchesByColumns; + + protected: + virtual void DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) override { + NOlap::NBlobOperations::NRead::TCompositeReadBlobs blobsData = ExtractBlobsData(); + for (auto&& [columnId, data] : RangesByColumn) { + for (auto&& [storageId, blobs] : data) { + for (auto&& b : blobs) { + const TString blob = blobsData.Extract(storageId, b); + auto sketch = std::unique_ptr(TCountMinSketch::FromString(blob.data(), blob.size())); + auto it = SketchesByColumns.find(columnId); + AFL_VERIFY(it != SketchesByColumns.end()); + *it->second += *sketch; + } + } + } + Result->AddResult(std::move(SketchesByColumns)); + } + + virtual bool DoOnError( + const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)( + "status", status.GetErrorMessage())("status_code", status.GetStatus()); + AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())( + "error", status.GetErrorMessage())("type", "STATISTICS"); + return false; + } + + public: + TIndexReadTask(const std::shared_ptr& result, + std::vector>&& readingActions, + THashMap>>&& rangesByColumn, + THashMap>&& readySketches) + : TBase(std::move(readingActions), "STATISTICS", "STATISTICS") + , Result(result) + , RangesByColumn(std::move(rangesByColumn)) + , SketchesByColumns(std::move(readySketches)) { + AFL_VERIFY(!!Result); + AFL_VERIFY(RangesByColumn.size()); + } + }; + class TMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber { private: + const std::shared_ptr StoragesManager; const std::shared_ptr Result; std::shared_ptr VersionedIndex; const std::set ColumnTagsRequested; @@ -143,6 +190,9 @@ class TColumnPortionsAccumulator { sketchesByColumns.emplace(id, TCountMinSketch::Create()); } + THashMap>> rangesByColumn; + THashMap indexIdToColumnId; + for (const auto& [id, portionInfo] : result.GetPortions()) { std::shared_ptr portionSchema = portionInfo.GetPortionInfo().GetSchema(*VersionedIndex); for (const ui32 columnId : ColumnTagsRequested) { @@ -153,27 +203,48 @@ class TColumnPortionsAccumulator { continue; } AFL_VERIFY(indexMeta->GetColumnIds().size() == 1); - - const std::vector data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId()); - - for (const auto& sketchAsString : data) { - auto sketch = - std::unique_ptr(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size())); - *sketchesByColumns[columnId] += *sketch; + indexIdToColumnId.emplace(indexMeta->GetIndexId(), columnId); + if (!indexMeta->IsInplaceData()) { + portionInfo.FillBlobRangesByStorage(rangesByColumn, portionSchema->GetIndexInfo(), { indexMeta->GetIndexId() }); + } else { + const std::vector data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId()); + + for (const auto& sketchAsString : data) { + auto sketch = + std::unique_ptr(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size())); + *sketchesByColumns[columnId] += *sketch; + } } } } - Result->AddResult(std::move(sketchesByColumns)); + if (rangesByColumn.size()) { + NOlap::TBlobsAction blobsAction(StoragesManager, NOlap::NBlobOperations::EConsumer::STATISTICS); + THashMap>> rangesByColumnLocal; + for (auto&& i : rangesByColumn) { + for (auto&& [storageId, ranges] : i.second) { + auto reader = blobsAction.GetReading(storageId); + for (auto&& i : ranges) { + reader->AddRange(i); + } + } + auto it = indexIdToColumnId.find(i.first); + AFL_VERIFY(it != indexIdToColumnId.end()); + rangesByColumnLocal.emplace(it->second, std::move(i.second)); + } + TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(std::make_shared( + Result, blobsAction.GetReadingActions(), std::move(rangesByColumnLocal), std::move(sketchesByColumns)))); + } else { + Result->AddResult(std::move(sketchesByColumns)); + } } public: - TMetadataSubscriber( - const std::shared_ptr& result, const std::shared_ptr& vIndex, const std::set& tags) - : Result(result) + TMetadataSubscriber(const std::shared_ptr& storagesManager, const std::shared_ptr& result, + const std::shared_ptr& vIndex, const std::set& tags) + : StoragesManager(storagesManager) + , Result(result) , VersionedIndex(vIndex) - , ColumnTagsRequested(tags) - { - + , ColumnTagsRequested(tags) { } }; @@ -186,7 +257,7 @@ class TColumnPortionsAccumulator { for (auto&& i : Portions) { request->AddPortion(i); } - request->RegisterSubscriber(std::make_shared(Result, VersionedIndex, ColumnTagsRequested)); + request->RegisterSubscriber(std::make_shared(StoragesManager, Result, VersionedIndex, ColumnTagsRequested)); Portions.clear(); DataAccessors->AskData(request); } @@ -234,7 +305,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, std::shared_ptr resultAccumulator = std::make_shared(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response)); auto versionedIndex = std::make_shared(index.GetVersionedIndex()); - TColumnPortionsAccumulator portionsPack(resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified()); + TColumnPortionsAccumulator portionsPack( + StoragesManager, resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified()); for (const auto& [_, portionInfo] : spg->GetPortions()) { if (!portionInfo->IsVisible(GetMaxReadVersion())) { @@ -246,4 +318,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, resultAccumulator->Start(); } -} +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp index 61209dcee4de..0e2ff7468055 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp @@ -116,6 +116,33 @@ TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssembl return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent); } +void TPortionDataAccessor::FillBlobRangesByStorage(THashMap>>& result, + const TVersionedIndex& index, const THashSet& entityIds) const { + auto schema = PortionInfo->GetSchema(index); + return FillBlobRangesByStorage(result, schema->GetIndexInfo(), entityIds); +} + +void TPortionDataAccessor::FillBlobRangesByStorage( + THashMap>>& result, const TIndexInfo& indexInfo, const THashSet& entityIds) const { + for (auto&& i : GetRecordsVerified()) { + if (!entityIds.contains(i.GetEntityId())) { + continue; + } + const TString& storageId = PortionInfo->GetColumnStorageId(i.GetColumnId(), indexInfo); + AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(i.GetBlobRange())).second)( + "blob_id", PortionInfo->RestoreBlobRange(i.GetBlobRange()).ToString()); + } + for (auto&& i : GetIndexesVerified()) { + if (!entityIds.contains(i.GetEntityId())) { + continue; + } + const TString& storageId = PortionInfo->GetIndexStorageId(i.GetIndexId(), indexInfo); + auto bRange = i.GetBlobRangeVerified(); + AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(bRange)).second)( + "blob_id", PortionInfo->RestoreBlobRange(bRange).ToString()); + } +} + void TPortionDataAccessor::FillBlobRangesByStorage(THashMap>& result, const TVersionedIndex& index) const { auto schema = PortionInfo->GetSchema(index); return FillBlobRangesByStorage(result, schema->GetIndexInfo()); diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.h b/ydb/core/tx/columnshard/engines/portions/data_accessor.h index fadbf8f8cc30..48b8509cfc50 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.h +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.h @@ -229,6 +229,9 @@ class TPortionDataAccessor { void FillBlobRangesByStorage(THashMap>& result, const TIndexInfo& indexInfo) const; void FillBlobRangesByStorage(THashMap>& result, const TVersionedIndex& index) const; + void FillBlobRangesByStorage(THashMap>>& result, const TIndexInfo& indexInfo, const THashSet& entityIds) const; + void FillBlobRangesByStorage( + THashMap>>& result, const TVersionedIndex& index, const THashSet& entityIds) const; void FillBlobIdsByStorage(THashMap>& result, const TIndexInfo& indexInfo) const; void FillBlobIdsByStorage(THashMap>& result, const TVersionedIndex& index) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h index 55c25ace4869..96ad743b1a9c 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h @@ -45,6 +45,10 @@ class IIndexMeta { using TFactory = NObjectFactory::TObjectFactory; using TProto = NKikimrSchemeOp::TOlapIndexDescription; + bool IsInplaceData() const { + return StorageId == NBlobOperations::TGlobal::LocalMetadataStorageId; + } + IIndexMeta() = default; IIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId) : IndexName(indexName) diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp index 7dcbaa9db476..362ee4a098bd 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp @@ -20,7 +20,7 @@ std::shared_ptr TCountMinSketchConstructor } AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second); } - return std::make_shared(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::LocalMetadataStorageId), columnIds); + return std::make_shared(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds); } NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {