Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix incorrect default storage id for count min sketch index #12643

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/blobs_action/counters/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum class EConsumer {
WRITING_BUFFER,
WRITING_OPERATOR,
NORMALIZER,
STATISTICS,

COUNT
};
Expand Down
128 changes: 100 additions & 28 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
#include "columnshard.h"
#include "columnshard_impl.h"

#include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h"

#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

#include <yql/essentials/core/minsketch/count_min_sketch.h>


namespace NKikimr::NColumnShard {

void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) {
auto& requestRecord = ev->Get()->Record;
// TODO Start a potentially long analysis process.
// ...



// Return the response when the analysis is completed
auto response = std::make_unique<NStat::TEvStatistics::TEvAnalyzeTableResponse>();
auto& responseRecord = response->Record;
Expand Down Expand Up @@ -64,8 +63,7 @@ class TResultAccumulator {
std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse>&& response)
: RequestSenderActorId(requestSenderActorId)
, Cookie(cookie)
, Response(std::move(response))
{
, Response(std::move(response)) {
for (auto&& i : tags) {
AFL_VERIFY(Calculated.emplace(i, nullptr).second);
}
Expand Down Expand Up @@ -104,11 +102,11 @@ class TResultAccumulator {
OnResultReady();
}
}

};

class TColumnPortionsAccumulator {
private:
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
const std::set<ui32> ColumnTagsRequested;
std::vector<NOlap::TPortionInfo::TConstPtr> Portions;
const ui32 PortionsCountLimit = 10000;
Expand All @@ -117,19 +115,68 @@ class TColumnPortionsAccumulator {
const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;

public:
TColumnPortionsAccumulator(const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit,
const std::set<ui32>& originalColumnTags, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
TColumnPortionsAccumulator(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit, const std::set<ui32>& originalColumnTags,
const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
: ColumnTagsRequested(originalColumnTags)
: StoragesManager(storagesManager)
, ColumnTagsRequested(originalColumnTags)
, PortionsCountLimit(portionsCountLimit)
, DataAccessors(dataAccessorsManager)
, Result(result)
, VersionedIndex(vIndex)
{
, VersionedIndex(vIndex) {
}

class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask {
private:
using TBase = NOlap::NBlobOperations::NRead::ITask;
const std::shared_ptr<TResultAccumulator> Result;
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> RangesByColumn;
THashMap<ui32, std::unique_ptr<TCountMinSketch>> SketchesByColumns;

protected:
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) override {
NOlap::NBlobOperations::NRead::TCompositeReadBlobs blobsData = ExtractBlobsData();
for (auto&& [columnId, data] : RangesByColumn) {
for (auto&& [storageId, blobs] : data) {
for (auto&& b : blobs) {
const TString blob = blobsData.Extract(storageId, b);
auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(blob.data(), blob.size()));
auto it = SketchesByColumns.find(columnId);
AFL_VERIFY(it != SketchesByColumns.end());
*it->second += *sketch;
}
}
}
Result->AddResult(std::move(SketchesByColumns));
}

virtual bool DoOnError(
const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)(
"status", status.GetErrorMessage())("status_code", status.GetStatus());
AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())(
"error", status.GetErrorMessage())("type", "STATISTICS");
return false;
}

public:
TIndexReadTask(const std::shared_ptr<TResultAccumulator>& result,
std::vector<std::shared_ptr<NOlap::IBlobsReadingAction>>&& readingActions,
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>>&& rangesByColumn,
THashMap<ui32, std::unique_ptr<TCountMinSketch>>&& readySketches)
: TBase(std::move(readingActions), "STATISTICS", "STATISTICS")
, Result(result)
, RangesByColumn(std::move(rangesByColumn))
, SketchesByColumns(std::move(readySketches)) {
AFL_VERIFY(!!Result);
AFL_VERIFY(RangesByColumn.size());
}
};

class TMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
private:
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
const std::shared_ptr<TResultAccumulator> Result;
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
const std::set<ui32> ColumnTagsRequested;
Expand All @@ -143,6 +190,9 @@ class TColumnPortionsAccumulator {
sketchesByColumns.emplace(id, TCountMinSketch::Create());
}

THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> rangesByColumn;
THashMap<ui32, ui32> indexIdToColumnId;

for (const auto& [id, portionInfo] : result.GetPortions()) {
std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo().GetSchema(*VersionedIndex);
for (const ui32 columnId : ColumnTagsRequested) {
Expand All @@ -153,27 +203,48 @@ class TColumnPortionsAccumulator {
continue;
}
AFL_VERIFY(indexMeta->GetColumnIds().size() == 1);

const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());

for (const auto& sketchAsString : data) {
auto sketch =
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
*sketchesByColumns[columnId] += *sketch;
indexIdToColumnId.emplace(indexMeta->GetIndexId(), columnId);
if (!indexMeta->IsInplaceData()) {
portionInfo.FillBlobRangesByStorage(rangesByColumn, portionSchema->GetIndexInfo(), { indexMeta->GetIndexId() });
} else {
const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());

for (const auto& sketchAsString : data) {
auto sketch =
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
*sketchesByColumns[columnId] += *sketch;
}
}
}
}
Result->AddResult(std::move(sketchesByColumns));
if (rangesByColumn.size()) {
NOlap::TBlobsAction blobsAction(StoragesManager, NOlap::NBlobOperations::EConsumer::STATISTICS);
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> rangesByColumnLocal;
for (auto&& i : rangesByColumn) {
for (auto&& [storageId, ranges] : i.second) {
auto reader = blobsAction.GetReading(storageId);
for (auto&& i : ranges) {
reader->AddRange(i);
}
}
auto it = indexIdToColumnId.find(i.first);
AFL_VERIFY(it != indexIdToColumnId.end());
rangesByColumnLocal.emplace(it->second, std::move(i.second));
}
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(std::make_shared<TIndexReadTask>(
Result, blobsAction.GetReadingActions(), std::move(rangesByColumnLocal), std::move(sketchesByColumns))));
} else {
Result->AddResult(std::move(sketchesByColumns));
}
}

public:
TMetadataSubscriber(
const std::shared_ptr<TResultAccumulator>& result, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
: Result(result)
TMetadataSubscriber(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const std::shared_ptr<TResultAccumulator>& result,
const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
: StoragesManager(storagesManager)
, Result(result)
, VersionedIndex(vIndex)
, ColumnTagsRequested(tags)
{

, ColumnTagsRequested(tags) {
}
};

Expand All @@ -186,7 +257,7 @@ class TColumnPortionsAccumulator {
for (auto&& i : Portions) {
request->AddPortion(i);
}
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(Result, VersionedIndex, ColumnTagsRequested));
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(StoragesManager, Result, VersionedIndex, ColumnTagsRequested));
Portions.clear();
DataAccessors->AskData(request);
}
Expand Down Expand Up @@ -234,7 +305,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
std::shared_ptr<TResultAccumulator> resultAccumulator =
std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response));
auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex());
TColumnPortionsAccumulator portionsPack(resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());
TColumnPortionsAccumulator portionsPack(
StoragesManager, resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());

for (const auto& [_, portionInfo] : spg->GetPortions()) {
if (!portionInfo->IsVisible(GetMaxReadVersion())) {
Expand All @@ -246,4 +318,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
resultAccumulator->Start();
}

}
} // namespace NKikimr::NColumnShard
27 changes: 27 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,33 @@ TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssembl
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
}

void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result,
const TVersionedIndex& index, const THashSet<ui32>& entityIds) const {
auto schema = PortionInfo->GetSchema(index);
return FillBlobRangesByStorage(result, schema->GetIndexInfo(), entityIds);
}

void TPortionDataAccessor::FillBlobRangesByStorage(
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const {
for (auto&& i : GetRecordsVerified()) {
if (!entityIds.contains(i.GetEntityId())) {
continue;
}
const TString& storageId = PortionInfo->GetColumnStorageId(i.GetColumnId(), indexInfo);
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(i.GetBlobRange())).second)(
"blob_id", PortionInfo->RestoreBlobRange(i.GetBlobRange()).ToString());
}
for (auto&& i : GetIndexesVerified()) {
if (!entityIds.contains(i.GetEntityId())) {
continue;
}
const TString& storageId = PortionInfo->GetIndexStorageId(i.GetIndexId(), indexInfo);
auto bRange = i.GetBlobRangeVerified();
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(bRange)).second)(
"blob_id", PortionInfo->RestoreBlobRange(bRange).ToString());
}
}

void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const {
auto schema = PortionInfo->GetSchema(index);
return FillBlobRangesByStorage(result, schema->GetIndexInfo());
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ class TPortionDataAccessor {

void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TIndexInfo& indexInfo) const;
void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const;
void FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const;
void FillBlobRangesByStorage(
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TVersionedIndex& index, const THashSet<ui32>& entityIds) const;
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TIndexInfo& indexInfo) const;
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TVersionedIndex& index) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class IIndexMeta {
using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
using TProto = NKikimrSchemeOp::TOlapIndexDescription;

bool IsInplaceData() const {
return StorageId == NBlobOperations::TGlobal::LocalMetadataStorageId;
}

IIndexMeta() = default;
IIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId)
: IndexName(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TCountMinSketchConstructor
}
AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
}
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::LocalMetadataStorageId), columnIds);
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds);
}

NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
Expand Down
Loading