Skip to content

Commit

Permalink
fix incorrect default storage id for count min sketch index (#12643)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 3, 2025
1 parent 916a423 commit d37db99
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 29 deletions.
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/blobs_action/counters/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum class EConsumer {
WRITING_BUFFER,
WRITING_OPERATOR,
NORMALIZER,
STATISTICS,

COUNT
};
Expand Down
128 changes: 100 additions & 28 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
#include "columnshard.h"
#include "columnshard_impl.h"

#include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h"

#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

#include <yql/essentials/core/minsketch/count_min_sketch.h>


namespace NKikimr::NColumnShard {

void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) {
auto& requestRecord = ev->Get()->Record;
// TODO Start a potentially long analysis process.
// ...



// Return the response when the analysis is completed
auto response = std::make_unique<NStat::TEvStatistics::TEvAnalyzeTableResponse>();
auto& responseRecord = response->Record;
Expand Down Expand Up @@ -64,8 +63,7 @@ class TResultAccumulator {
std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse>&& response)
: RequestSenderActorId(requestSenderActorId)
, Cookie(cookie)
, Response(std::move(response))
{
, Response(std::move(response)) {
for (auto&& i : tags) {
AFL_VERIFY(Calculated.emplace(i, nullptr).second);
}
Expand Down Expand Up @@ -104,11 +102,11 @@ class TResultAccumulator {
OnResultReady();
}
}

};

class TColumnPortionsAccumulator {
private:
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
const std::set<ui32> ColumnTagsRequested;
std::vector<NOlap::TPortionInfo::TConstPtr> Portions;
const ui32 PortionsCountLimit = 10000;
Expand All @@ -117,19 +115,68 @@ class TColumnPortionsAccumulator {
const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;

public:
TColumnPortionsAccumulator(const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit,
const std::set<ui32>& originalColumnTags, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
TColumnPortionsAccumulator(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit, const std::set<ui32>& originalColumnTags,
const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
: ColumnTagsRequested(originalColumnTags)
: StoragesManager(storagesManager)
, ColumnTagsRequested(originalColumnTags)
, PortionsCountLimit(portionsCountLimit)
, DataAccessors(dataAccessorsManager)
, Result(result)
, VersionedIndex(vIndex)
{
, VersionedIndex(vIndex) {
}

class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask {
private:
using TBase = NOlap::NBlobOperations::NRead::ITask;
const std::shared_ptr<TResultAccumulator> Result;
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> RangesByColumn;
THashMap<ui32, std::unique_ptr<TCountMinSketch>> SketchesByColumns;

protected:
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) override {
NOlap::NBlobOperations::NRead::TCompositeReadBlobs blobsData = ExtractBlobsData();
for (auto&& [columnId, data] : RangesByColumn) {
for (auto&& [storageId, blobs] : data) {
for (auto&& b : blobs) {
const TString blob = blobsData.Extract(storageId, b);
auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(blob.data(), blob.size()));
auto it = SketchesByColumns.find(columnId);
AFL_VERIFY(it != SketchesByColumns.end());
*it->second += *sketch;
}
}
}
Result->AddResult(std::move(SketchesByColumns));
}

virtual bool DoOnError(
const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)(
"status", status.GetErrorMessage())("status_code", status.GetStatus());
AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())(
"error", status.GetErrorMessage())("type", "STATISTICS");
return false;
}

public:
TIndexReadTask(const std::shared_ptr<TResultAccumulator>& result,
std::vector<std::shared_ptr<NOlap::IBlobsReadingAction>>&& readingActions,
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>>&& rangesByColumn,
THashMap<ui32, std::unique_ptr<TCountMinSketch>>&& readySketches)
: TBase(std::move(readingActions), "STATISTICS", "STATISTICS")
, Result(result)
, RangesByColumn(std::move(rangesByColumn))
, SketchesByColumns(std::move(readySketches)) {
AFL_VERIFY(!!Result);
AFL_VERIFY(RangesByColumn.size());
}
};

class TMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
private:
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
const std::shared_ptr<TResultAccumulator> Result;
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
const std::set<ui32> ColumnTagsRequested;
Expand All @@ -143,6 +190,9 @@ class TColumnPortionsAccumulator {
sketchesByColumns.emplace(id, TCountMinSketch::Create());
}

THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> rangesByColumn;
THashMap<ui32, ui32> indexIdToColumnId;

for (const auto& [id, portionInfo] : result.GetPortions()) {
std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo().GetSchema(*VersionedIndex);
for (const ui32 columnId : ColumnTagsRequested) {
Expand All @@ -153,27 +203,48 @@ class TColumnPortionsAccumulator {
continue;
}
AFL_VERIFY(indexMeta->GetColumnIds().size() == 1);

const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());

for (const auto& sketchAsString : data) {
auto sketch =
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
*sketchesByColumns[columnId] += *sketch;
indexIdToColumnId.emplace(indexMeta->GetIndexId(), columnId);
if (!indexMeta->IsInplaceData()) {
portionInfo.FillBlobRangesByStorage(rangesByColumn, portionSchema->GetIndexInfo(), { indexMeta->GetIndexId() });
} else {
const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());

for (const auto& sketchAsString : data) {
auto sketch =
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
*sketchesByColumns[columnId] += *sketch;
}
}
}
}
Result->AddResult(std::move(sketchesByColumns));
if (rangesByColumn.size()) {
NOlap::TBlobsAction blobsAction(StoragesManager, NOlap::NBlobOperations::EConsumer::STATISTICS);
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> rangesByColumnLocal;
for (auto&& i : rangesByColumn) {
for (auto&& [storageId, ranges] : i.second) {
auto reader = blobsAction.GetReading(storageId);
for (auto&& i : ranges) {
reader->AddRange(i);
}
}
auto it = indexIdToColumnId.find(i.first);
AFL_VERIFY(it != indexIdToColumnId.end());
rangesByColumnLocal.emplace(it->second, std::move(i.second));
}
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(std::make_shared<TIndexReadTask>(
Result, blobsAction.GetReadingActions(), std::move(rangesByColumnLocal), std::move(sketchesByColumns))));
} else {
Result->AddResult(std::move(sketchesByColumns));
}
}

public:
TMetadataSubscriber(
const std::shared_ptr<TResultAccumulator>& result, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
: Result(result)
TMetadataSubscriber(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const std::shared_ptr<TResultAccumulator>& result,
const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
: StoragesManager(storagesManager)
, Result(result)
, VersionedIndex(vIndex)
, ColumnTagsRequested(tags)
{

, ColumnTagsRequested(tags) {
}
};

Expand All @@ -186,7 +257,7 @@ class TColumnPortionsAccumulator {
for (auto&& i : Portions) {
request->AddPortion(i);
}
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(Result, VersionedIndex, ColumnTagsRequested));
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(StoragesManager, Result, VersionedIndex, ColumnTagsRequested));
Portions.clear();
DataAccessors->AskData(request);
}
Expand Down Expand Up @@ -234,7 +305,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
std::shared_ptr<TResultAccumulator> resultAccumulator =
std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response));
auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex());
TColumnPortionsAccumulator portionsPack(resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());
TColumnPortionsAccumulator portionsPack(
StoragesManager, resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());

for (const auto& [_, portionInfo] : spg->GetPortions()) {
if (!portionInfo->IsVisible(GetMaxReadVersion())) {
Expand All @@ -246,4 +318,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
resultAccumulator->Start();
}

}
} // namespace NKikimr::NColumnShard
27 changes: 27 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,33 @@ TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssembl
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
}

void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result,
const TVersionedIndex& index, const THashSet<ui32>& entityIds) const {
auto schema = PortionInfo->GetSchema(index);
return FillBlobRangesByStorage(result, schema->GetIndexInfo(), entityIds);
}

void TPortionDataAccessor::FillBlobRangesByStorage(
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const {
for (auto&& i : GetRecordsVerified()) {
if (!entityIds.contains(i.GetEntityId())) {
continue;
}
const TString& storageId = PortionInfo->GetColumnStorageId(i.GetColumnId(), indexInfo);
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(i.GetBlobRange())).second)(
"blob_id", PortionInfo->RestoreBlobRange(i.GetBlobRange()).ToString());
}
for (auto&& i : GetIndexesVerified()) {
if (!entityIds.contains(i.GetEntityId())) {
continue;
}
const TString& storageId = PortionInfo->GetIndexStorageId(i.GetIndexId(), indexInfo);
auto bRange = i.GetBlobRangeVerified();
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(bRange)).second)(
"blob_id", PortionInfo->RestoreBlobRange(bRange).ToString());
}
}

void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const {
auto schema = PortionInfo->GetSchema(index);
return FillBlobRangesByStorage(result, schema->GetIndexInfo());
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ class TPortionDataAccessor {

void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TIndexInfo& indexInfo) const;
void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const;
void FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const;
void FillBlobRangesByStorage(
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TVersionedIndex& index, const THashSet<ui32>& entityIds) const;
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TIndexInfo& indexInfo) const;
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TVersionedIndex& index) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class IIndexMeta {
using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
using TProto = NKikimrSchemeOp::TOlapIndexDescription;

bool IsInplaceData() const {
return StorageId == NBlobOperations::TGlobal::LocalMetadataStorageId;
}

IIndexMeta() = default;
IIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId)
: IndexName(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TCountMinSketchConstructor
}
AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
}
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::LocalMetadataStorageId), columnIds);
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds);
}

NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
Expand Down

0 comments on commit d37db99

Please sign in to comment.