Skip to content

Commit

Permalink
Merge bd074c6 into 42d0f96
Browse files Browse the repository at this point in the history
  • Loading branch information
ildar-khisambeev authored Jul 17, 2024
2 parents 42d0f96 + bd074c6 commit e510d9a
Show file tree
Hide file tree
Showing 15 changed files with 511 additions and 5 deletions.
10 changes: 10 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ message TRequestedMaxIndex {
optional string ColumnName = 1;
}

message TRequestedCountMinSketch {
repeated string ColumnNames = 1;
}

message TOlapIndexRequested {
optional string Name = 1;
optional TCompressionOptions Compression = 3;
Expand All @@ -464,6 +468,7 @@ message TOlapIndexRequested {
oneof Implementation {
TRequestedBloomFilter BloomFilter = 40;
TRequestedMaxIndex MaxIndex = 41;
TRequestedCountMinSketch CountMinSketch = 42;
}
}

Expand All @@ -477,6 +482,10 @@ message TMaxIndex {
optional uint32 ColumnId = 1;
}

message TCountMinSketch {
repeated uint32 ColumnIds = 1;
}

message TOlapIndexDescription {
// This id is auto-generated by schemeshard
optional uint32 Id = 1;
Expand All @@ -490,6 +499,7 @@ message TOlapIndexDescription {
oneof Implementation {
TBloomFilter BloomFilter = 41;
TMaxIndex MaxIndex = 42;
TCountMinSketch CountMinSketch = 43;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "checker.h"
#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/formats/arrow/common/validation.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>

namespace NKikimr::NOlap::NIndexes::NCountMinSketch {

void TCountMinSketchChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const {
Y_ABORT("Unimplemented"); // unimplemented, should not be used
}

bool TCountMinSketchChecker::DoCheckImpl(const std::vector<TString>& blobs) const {
Y_ABORT("Unimplemented"); // unimplemented, should not be used
return false;
}

bool TCountMinSketchChecker::DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) {
Y_ABORT("Unimplemented"); // unimplemented, should not be used
return false;
}

} // namespace NKikimr::NOlap::NIndexes
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h>

namespace NKikimr::NOlap::NIndexes::NCountMinSketch {

class TCountMinSketchChecker: public TSimpleIndexChecker {
public:
static TString GetClassNameStatic() {
return "COUNT_MIN_SKETCH";
}
private:
using TBase = TSimpleIndexChecker;
static inline auto Registrator = TFactory::TRegistrator<TCountMinSketchChecker>(GetClassNameStatic());

protected:
virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override;
virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override;

virtual bool DoCheckImpl(const std::vector<TString>& blobs) const override;

public:
TCountMinSketchChecker() = default;
TCountMinSketchChecker(const ui32 indexId)
: TBase(indexId)
{}

virtual TString GetClassName() const override {
return GetClassNameStatic();
}
};

} // namespace NKikimr::NOlap::NIndexes
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#include "constructor.h"
#include "meta.h"

#include <ydb/core/tx/schemeshard/olap/schema/schema.h>

namespace NKikimr::NOlap::NIndexes::NCountMinSketch {

std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TCountMinSketchConstructor::DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
std::set<ui32> columnIds;
for (auto&& i : ColumnNames) {
auto* columnInfo = currentSchema.GetColumns().GetByName(i);
if (!columnInfo) {
errors.AddError("no column with name " + i);
return nullptr;
}
AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
}
return std::make_shared<TIndexMeta>(indexId, indexName, columnIds);
}

NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
if (!jsonInfo.Has("column_names")) {
return TConclusionStatus::Fail("column_names have to be in count min sketch features");
}
const NJson::TJsonValue::TArray* columnNamesArray;
if (!jsonInfo["column_names"].GetArrayPointer(&columnNamesArray)) {
return TConclusionStatus::Fail("column_names have to be in count min sketch features as array ['column_name_1', ... , 'column_name_N']");
}
for (auto&& i : *columnNamesArray) {
if (!i.IsString()) {
return TConclusionStatus::Fail("column_names have to be in count min sketch features as array of strings ['column_name_1', ... , 'column_name_N']");
}
ColumnNames.emplace(i.GetString());
}
return TConclusionStatus::Success();
}

NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
if (!proto.HasCountMinSketch()) {
const TString errorMessage = "not found CountMinSketch section in proto: \"" + proto.DebugString() + "\"";
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage);
return TConclusionStatus::Fail(errorMessage);
}
auto& sketch = proto.GetCountMinSketch();
for (auto&& i : sketch.GetColumnNames()) {
ColumnNames.emplace(i);
}
return TConclusionStatus::Success();
}

void TCountMinSketchConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
auto* sketchProto = proto.MutableCountMinSketch();
for (auto&& i : ColumnNames) {
sketchProto->AddColumnNames(i);
}
}

} // namespace NKikimr::NOlap::NIndexes
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>

namespace NKikimr::NOlap::NIndexes::NCountMinSketch {

class TCountMinSketchConstructor: public IIndexMetaConstructor {
public:
static TString GetClassNameStatic() {
return "COUNT_MIN_SKETCH";
}
private:
std::set<TString> ColumnNames;
static inline auto Registrator = TFactory::TRegistrator<TCountMinSketchConstructor>(GetClassNameStatic());

protected:
virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override;

virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override;

virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override;

public:
TCountMinSketchConstructor() = default;

virtual TString GetClassName() const override {
return GetClassNameStatic();
}
};

} // namespace NKikimr::NOlap::NIndexes
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include "meta.h"
#include "checker.h"
#include <ydb/core/formats/arrow/hash/xx_hash.h>
#include <ydb/core/formats/arrow/hash/calcer.h>
#include <ydb/core/tx/program/program.h>
#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
#include <ydb/library/minsketch/stack_count_min_sketch.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <library/cpp/deprecated/atomic/atomic.h>

namespace NKikimr::NOlap::NIndexes::NCountMinSketch {

TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
std::vector<TStackAllocatedCountMinSketch<256, 8>> sketchesByColumns(ColumnIds.size());

AFL_VERIFY(std::distance(reader.begin(), reader.end()) == static_cast<long>(sketchesByColumns.size()));

for (reader.Start(); reader.IsCorrect(); reader.ReadNext()) {
size_t sketchIndex = 0;
for (auto&& colReader : reader) {
auto array = colReader.GetCurrentChunk();
auto& sketch = sketchesByColumns[sketchIndex];
int i = colReader.GetCurrentRecordIndex();

NArrow::SwitchType(array->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;

const TArray& arrTyped = static_cast<const TArray&>(*array);
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
auto cell = TCell::Make(arrTyped.Value(i));
sketch.Count(cell.Data(), cell.Size());
return true;
}
if constexpr (arrow::has_string_view<typename TWrap::T>()) {
auto view = arrTyped.GetView(i);
sketch.Count(view.data(), view.size());
return true;
}
AFL_VERIFY(false);
});
++sketchIndex;
}
}

TString result(reinterpret_cast<const char*>(sketchesByColumns.data()), sketchesByColumns.size() * TStackAllocatedCountMinSketch<256, 8>::GetSize());
return result;
}

void TIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
for (auto&& branch : info->GetBranches()) {
branch->MutableIndexes().emplace_back(std::make_shared<TCountMinSketchChecker>(GetIndexId()));
}
}

} // namespace NKikimr::NOlap::NIndexes
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h>

namespace NKikimr::NOlap::NIndexes::NCountMinSketch {

class TIndexMeta: public TIndexByColumns {
public:
static TString GetClassNameStatic() {
return "COUNT_MIN_SKETCH";
}

private:
using TBase = TIndexByColumns;

static inline auto Registrator = TFactory::TRegistrator<TIndexMeta>(GetClassNameStatic());

protected:
virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override {
const auto* bMeta = dynamic_cast<const TIndexMeta*>(&newMeta);
if (!bMeta) {
return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName());
}
return TBase::CheckSameColumnsForModification(newMeta);
}

virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override;

virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const override;

virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override {
AFL_VERIFY(TBase::DoDeserializeFromProto(proto));
AFL_VERIFY(proto.HasCountMinSketch());
auto& sketch = proto.GetCountMinSketch();
for (auto&& i : sketch.GetColumnIds()) {
ColumnIds.emplace(i);
}
return true;
}

virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override {
auto* sketchProto = proto.MutableCountMinSketch();
for (auto&& i : ColumnIds) {
sketchProto->AddColumnIds(i);
}
}

public:
TIndexMeta() = default;
TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, std::set<ui32>& columnIds)
: TBase(indexId, indexName, columnIds, storageId) {
}

virtual TString GetClassName() const override {
return GetClassNameStatic();
}
};

} // namespace NKikimr::NOlap::NIndexes
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
LIBRARY()

SRCS(
GLOBAL constructor.cpp
GLOBAL meta.cpp
GLOBAL checker.cpp
)

PEERDIR(
ydb/core/protos
ydb/core/formats/arrow
ydb/core/tx/columnshard/engines/storage/indexes/portions
)

END()
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ class TPortionIndexChunk: public IPortionDataChunk {
class TIndexByColumns: public IIndexMeta {
private:
using TBase = IIndexMeta;
std::shared_ptr<NArrow::NSerialization::ISerializer> Serializer;

protected:
std::shared_ptr<NArrow::NSerialization::ISerializer> Serializer;
std::set<ui32> ColumnIds;

virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const = 0;

virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const override final;
Expand All @@ -69,4 +71,4 @@ class TIndexByColumns: public IIndexMeta {
TIndexByColumns(const ui32 indexId, const TString& indexName, const std::set<ui32>& columnIds, const TString& storageId);
};

} // namespace NKikimr::NOlap::NIndexes
} // namespace NKikimr::NOlap::NIndexes
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/storage/indexes/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PEERDIR(
ydb/core/tx/columnshard/engines/storage/indexes/portions
ydb/core/tx/columnshard/engines/storage/indexes/bloom
ydb/core/tx/columnshard/engines/storage/indexes/max
ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch
)

END()
2 changes: 0 additions & 2 deletions ydb/library/minsketch/count_min_sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include <util/system/types.h>
#include <util/generic/strbuf.h>

#include <vector>

namespace NKikimr {

class TCountMinSketch {
Expand Down
Loading

0 comments on commit e510d9a

Please sign in to comment.