Skip to content

Commit

Permalink
Merge 0d5a97e into 916a423
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 3, 2025
2 parents 916a423 + 0d5a97e commit 54deb11
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 23 deletions.
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/blobs_action/counters/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum class EConsumer {
WRITING_BUFFER,
WRITING_OPERATOR,
NORMALIZER,
STATISTICS,

COUNT
};
Expand Down
107 changes: 85 additions & 22 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
#include "columnshard.h"
#include "columnshard_impl.h"

#include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h"

#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

#include <yql/essentials/core/minsketch/count_min_sketch.h>


namespace NKikimr::NColumnShard {

void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) {
auto& requestRecord = ev->Get()->Record;
// TODO Start a potentially long analysis process.
// ...



// Return the response when the analysis is completed
auto response = std::make_unique<NStat::TEvStatistics::TEvAnalyzeTableResponse>();
auto& responseRecord = response->Record;
Expand Down Expand Up @@ -64,8 +62,7 @@ class TResultAccumulator {
std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse>&& response)
: RequestSenderActorId(requestSenderActorId)
, Cookie(cookie)
, Response(std::move(response))
{
, Response(std::move(response)) {
for (auto&& i : tags) {
AFL_VERIFY(Calculated.emplace(i, nullptr).second);
}
Expand Down Expand Up @@ -104,11 +101,11 @@ class TResultAccumulator {
OnResultReady();
}
}

};

class TColumnPortionsAccumulator {
private:
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
const std::set<ui32> ColumnTagsRequested;
std::vector<NOlap::TPortionInfo::TConstPtr> Portions;
const ui32 PortionsCountLimit = 10000;
Expand All @@ -117,19 +114,66 @@ class TColumnPortionsAccumulator {
const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;

public:
TColumnPortionsAccumulator(const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit,
TColumnPortionsAccumulator(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const std::shared_ptr<TResultAccumulator>& result,
const ui32 portionsCountLimit,
const std::set<ui32>& originalColumnTags, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
: ColumnTagsRequested(originalColumnTags)
: StoragesManager(storagesManager)
, ColumnTagsRequested(originalColumnTags)
, PortionsCountLimit(portionsCountLimit)
, DataAccessors(dataAccessorsManager)
, Result(result)
, VersionedIndex(vIndex)
{
, VersionedIndex(vIndex) {
}

class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask {
private:
using TBase = NOlap::NBlobOperations::NRead::ITask;
const std::shared_ptr<TResultAccumulator> Result;
THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>> RangesByColumn;
THashMap<ui32, std::unique_ptr<TCountMinSketch>> SketchesByColumns;

protected:
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) override {
TCompositeReadBlobs blobs = ExtractBlobsData();
for (auto&& [columnId, data] : RangesByColumn) {
for (auto&& [storageId, blobs] : data) {
for (auto&& b : blobs) {
const TString blob = blobs.Extract(storageId, b);
auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(blob.data(), blob.size()));
*sketchesByColumns[columnId] += *sketch;
}
}
}
Result->AddResult(std::move(sketchesByColumns));
}

virtual bool DoOnError(
const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)(
"status", status.GetErrorMessage())("status_code", status.GetStatus());
AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())(
"error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())(
"task_id", TxEvent->IndexChanges->GetTaskIdentifier())("debug", TxEvent->IndexChanges->DebugString());
return false;
}

public:
TIndexReadTask(const std::shared_ptr<TResultAccumulator>& result, std::vector<std::shared_ptr<IBlobsReadingAction>>&& readingActions,
THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>>&& rangesByColumn,
THashMap<ui32, std::unique_ptr<TCountMinSketch>>&& readySketches)
: TBase(std::move(readingActions), "STATISTICS", "STATISTICS")
, Result(result)
, RangesByColumn(std::move(RangesByColumn))
, SketchesByColumns(std::move(readySketches)) {
AFL_VERIFY(!!Result);
AFL_VERIFY(RangesByColumn.size());
}
};

class TMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
private:
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
const std::shared_ptr<TResultAccumulator> Result;
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
const std::set<ui32> ColumnTagsRequested;
Expand All @@ -143,6 +187,8 @@ class TColumnPortionsAccumulator {
sketchesByColumns.emplace(id, TCountMinSketch::Create());
}

THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>> rangesByColumn;

for (const auto& [id, portionInfo] : result.GetPortions()) {
std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo().GetSchema(*VersionedIndex);
for (const ui32 columnId : ColumnTagsRequested) {
Expand All @@ -154,26 +200,42 @@ class TColumnPortionsAccumulator {
}
AFL_VERIFY(indexMeta->GetColumnIds().size() == 1);

const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());
if (!indexMeta->IsInplaceData()) {
portionInfo.FillBlobRangesByStorage(rangesByColumn, portionSchema->GetIndexInfo(), { columnId });
} else {
const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());

for (const auto& sketchAsString : data) {
auto sketch =
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
*sketchesByColumns[columnId] += *sketch;
for (const auto& sketchAsString : data) {
auto sketch =
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
*sketchesByColumns[columnId] += *sketch;
}
}
}
}
Result->AddResult(std::move(sketchesByColumns));
if (rangesByColumn.size()) {
TBlobsAction blobsAction(StoragesManager, NBlobOperations::EConsumer::STATISTICS);
for (auto&& i : rangesByColumn) {
for (auto&& [storageId, ranges]: i.second) {
auto reader = blobsAction.GetReading(storageId);
for (auto&& i : ranges) {
reader->AddRange(i);
}
}
}
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
std::make_shared<TIndexReadTask>(Result, blobsAction.GetReadingActions(), rangesByColumn, sketchesByColumns)));
} else {
Result->AddResult(std::move(sketchesByColumns));
}
}

public:
TMetadataSubscriber(
const std::shared_ptr<TResultAccumulator>& result, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
: Result(result)
, VersionedIndex(vIndex)
, ColumnTagsRequested(tags)
{

, ColumnTagsRequested(tags) {
}
};

Expand All @@ -186,7 +248,7 @@ class TColumnPortionsAccumulator {
for (auto&& i : Portions) {
request->AddPortion(i);
}
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(Result, VersionedIndex, ColumnTagsRequested));
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(StoragesManager, Result, VersionedIndex, ColumnTagsRequested));
Portions.clear();
DataAccessors->AskData(request);
}
Expand Down Expand Up @@ -234,7 +296,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
std::shared_ptr<TResultAccumulator> resultAccumulator =
std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response));
auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex());
TColumnPortionsAccumulator portionsPack(resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());
TColumnPortionsAccumulator portionsPack(StoragesManager,
resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());

for (const auto& [_, portionInfo] : spg->GetPortions()) {
if (!portionInfo->IsVisible(GetMaxReadVersion())) {
Expand All @@ -246,4 +309,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
resultAccumulator->Start();
}

}
} // namespace NKikimr::NColumnShard
27 changes: 27 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,33 @@ TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssembl
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
}

void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result,
const TVersionedIndex& index, const THashSet<ui32>& entityIds) const {
auto schema = PortionInfo->GetSchema(index);
return FillBlobRangesByStorage(result, schema->GetIndexInfo());
}

void TPortionDataAccessor::FillBlobRangesByStorage(
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const {
for (auto&& i : GetRecordsVerified()) {
if (!entityIds.contains(i.GetEntityId())) {
continue;
}
const TString& storageId = PortionInfo->GetColumnStorageId(i.GetColumnId(), indexInfo);
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(i.GetBlobRange())).second)(
"blob_id", PortionInfo->RestoreBlobRange(i.GetBlobRange()).ToString());
}
for (auto&& i : GetIndexesVerified()) {
if (!entityIds.contains(i.GetEntityId())) {
continue;
}
const TString& storageId = PortionInfo->GetIndexStorageId(i.GetIndexId(), indexInfo);
auto bRange = i.GetBlobRangeVerified();
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(bRange)).second)(
"blob_id", PortionInfo->RestoreBlobRange(bRange).ToString());
}
}

void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const {
auto schema = PortionInfo->GetSchema(index);
return FillBlobRangesByStorage(result, schema->GetIndexInfo());
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,19 @@ class TPortionDataAccessor {
return result;
}

THashMap<TString, std::vector<TBlobRange>> GetIndexRangesVerified(const ui32 indexId) const {
if (!Indexes) {
return {};
}
THashMap<TString, std::vector<TBlobRange>> result;
for (auto&& i : *Indexes) {
if (i.GetEntityId() == indexId) {
result.emplace_back(i.GetBlobDataVerified());
}
}
return result;
}

std::set<ui32> GetColumnIds() const {
std::set<ui32> result;
for (auto&& i : GetRecordsVerified()) {
Expand Down Expand Up @@ -229,6 +242,9 @@ class TPortionDataAccessor {

void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TIndexInfo& indexInfo) const;
void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const;
void FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const;
void FillBlobRangesByStorage(
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TVersionedIndex& index, const THashSet<ui32>& entityIds) const;
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TIndexInfo& indexInfo) const;
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TVersionedIndex& index) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TCountMinSketchConstructor
}
AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
}
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::LocalMetadataStorageId), columnIds);
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds);
}

NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
Expand Down

0 comments on commit 54deb11

Please sign in to comment.