Skip to content

Commit

Permalink
Merge b558bd1 into 8e44a97
Browse files Browse the repository at this point in the history
  • Loading branch information
ildar-khisambeev authored Aug 21, 2024
2 parents 8e44a97 + b558bd1 commit 13e0649
Show file tree
Hide file tree
Showing 27 changed files with 754 additions and 28 deletions.
4 changes: 4 additions & 0 deletions ydb/core/formats/arrow/protos/ssa.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ message TProgram {
repeated uint64 HashValues = 1;
}

message TCountMinSketchChecker {
}

message TOlapIndexChecker {
optional uint32 IndexId = 1;
optional string ClassName = 2;
Expand All @@ -56,6 +59,7 @@ message TProgram {
oneof Implementation {
TBloomFilterChecker BloomFilter = 40;
TCompositeChecker Composite = 41;
TCountMinSketchChecker CountMinSketch = 42;
}
}

Expand Down
142 changes: 134 additions & 8 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
#include <ydb/library/minsketch/stack_count_min_sketch.h>

#include <ydb/core/statistics/events.h>

#include <library/cpp/testing/unittest/registar.h>

Expand Down Expand Up @@ -59,7 +62,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -68,7 +71,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -105,6 +108,129 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
}

Y_UNIT_TEST(CountMinSketchIndex) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

Tests::NCommon::TLoggerInit(kikimr).SetComponents({NKikimrServices::TX_COLUMNSHARD}, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_ts, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['timestamp']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_res_id, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['resource_id']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_uid, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['uid']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_level, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['level']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_message, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['message']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);

csController->WaitActualization(TDuration::Seconds(10));
{
auto runtime = kikimr.GetTestServer().GetRuntime();
auto sender = runtime->AllocateEdgeActor();

TAutoPtr<IEventHandle> handle;

size_t shard = 0;
std::set<ui64> pathids;
for (auto&& i : csController->GetShardActualIds()) {
Cerr << ">>> shard actual id: " << i << Endl;
for (auto&& j : csController->GetPathIds(i)) {
Cerr << ">>> path id: " << j << Endl;
pathids.insert(j);
}
if (++shard == 3)
break;
}

UNIT_ASSERT(pathids.size() == 1);
ui64 pathId = *pathids.begin();

shard = 0;
for (auto&& i : csController->GetShardActualIds()) {
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
request->Record.MutableTable()->MutablePathId()->SetLocalId(pathId);

runtime->Send(MakePipePerNodeCacheID(false), sender, new TEvPipeCache::TEvForward(
request.release(), i, false));
if (++shard == 3)
break;
}

auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::Create());
for (size_t shard = 0; shard < 3; ++shard) {
auto event = runtime->GrabEdgeEvent<NStat::TEvStatistics::TEvStatisticsResponse>(handle);
UNIT_ASSERT(event);

auto& response = event->Record;
// Cerr << response << Endl;
UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS);
UNIT_ASSERT(response.ColumnsSize() == 5);
TString someData = response.GetColumns(0).GetStatistics(0).GetData();
*sketch += *std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(someData.data(), someData.size()));
Cerr << ">>> sketch.GetElementCount() = " << sketch->GetElementCount() << Endl;
UNIT_ASSERT(sketch->GetElementCount() > 0);
}
}
}

Y_UNIT_TEST(SchemeActualizationOnceOnStart) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
Expand Down Expand Up @@ -194,7 +320,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() << Sprintf(
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
)", StorageId.data());
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -203,7 +329,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
{
auto alterQuery = TStringBuilder() << Sprintf(
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
)", StorageId.data()
);
Expand Down Expand Up @@ -347,7 +473,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -357,7 +483,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid", "resource_id"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -367,7 +493,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.005}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -377,7 +503,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.01}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,11 @@ message TRequestedMaxIndex {
optional string ColumnName = 1;
}

message TRequestedCountMinSketch {
// sketch built on the combined data from the set of columns
repeated string ColumnNames = 1;
}

message TOlapIndexRequested {
optional string Name = 1;
optional TCompressionOptions Compression = 3;
Expand All @@ -491,6 +496,7 @@ message TOlapIndexRequested {
oneof Implementation {
TRequestedBloomFilter BloomFilter = 40;
TRequestedMaxIndex MaxIndex = 41;
TRequestedCountMinSketch CountMinSketch = 42;
}
}

Expand All @@ -504,6 +510,10 @@ message TMaxIndex {
optional uint32 ColumnId = 1;
}

message TCountMinSketch {
repeated uint32 ColumnIds = 1;
}

message TOlapIndexDescription {
// This id is auto-generated by schemeshard
optional uint32 Id = 1;
Expand All @@ -517,6 +527,7 @@ message TOlapIndexDescription {
oneof Implementation {
TBloomFilter BloomFilter = 41;
TMaxIndex MaxIndex = 42;
TCountMinSketch CountMinSketch = 43;
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/out/out.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrDataEvents::TEvWrite::ETxMode, stream, value) {
Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvAnalyzeStatusResponse_EStatus, stream, value) {
stream << NKikimrStat::TEvAnalyzeStatusResponse_EStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value) {
stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
}
14 changes: 13 additions & 1 deletion ydb/core/statistics/ut_common/ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,21 @@ void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TS
)", fullTableName.c_str(), shardCount)).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = session.ExecuteSchemeQuery(Sprintf(R"(
ALTER OBJECT `%s` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['Key']}`);
)", fullTableName.c_str())).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = session.ExecuteSchemeQuery(Sprintf(R"(
ALTER OBJECT `%s` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['Value']}`);
)", fullTableName.c_str())).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

NYdb::TValueBuilder rows;
rows.BeginList();
for (size_t i = 0; i < 100; ++i) {
for (size_t i = 0; i < 1000000; ++i) {
auto key = TValueBuilder().Uint64(i).Build();
auto value = TValueBuilder().OptionalUint64(i).Build();
rows.AddListItem();
Expand Down
64 changes: 54 additions & 10 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
#include "columnshard.h"
#include "columnshard_impl.h"
#include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h"

#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

#include <ydb/library/minsketch/count_min_sketch.h>


namespace NKikimr::NColumnShard {

Expand All @@ -22,23 +27,62 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const
}

void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, const TActorContext&) {
AFL_VERIFY(HasIndex());
auto index = GetIndexAs<NOlap::TColumnEngineForLogs>();
auto spg = index.GetGranuleOptional(ev->Get()->Record.GetTable().GetPathId().GetLocalId());
AFL_VERIFY(spg);

std::set<ui32> columnTagsRequested;
for (ui32 tag : ev->Get()->Record.GetTable().GetColumnTags()) {
columnTagsRequested.insert(tag);
}
if (columnTagsRequested.empty()) {
auto schema = index.GetVersionedIndex().GetLastSchema();
auto allColumnIds = schema->GetIndexInfo().GetColumnIds(false);
columnTagsRequested = std::set<ui32>(allColumnIds.begin(), allColumnIds.end());
}

std::map<ui32, std::unique_ptr<TCountMinSketch>> sketchesByColumns;
for (auto id : columnTagsRequested) {
sketchesByColumns.emplace(id, TCountMinSketch::Create());
}

for (const auto& [indexKey, keyPortions] : spg->GetPortionsIndex().GetPoints()) {
for (auto&& [_, portionInfo] : keyPortions.GetStart()) {
std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo->GetSchema(index.GetVersionedIndex());
for (ui32 columnId : columnTagsRequested) {
auto indexMeta = portionSchema->GetIndexInfo().GetIndexCountMinSketch({columnId});

if (!indexMeta) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "Missing countMinSketch index for columnId " + ToString(columnId));
continue;
}
AFL_VERIFY(indexMeta->GetColumnIds().size() == 1);

const std::vector<TString> data = portionInfo->GetIndexInplaceDataVerified(indexMeta->GetIndexId());

for (const auto& sketchAsString : data) {
auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
*sketchesByColumns[columnId] += *sketch;
}
}
}
}

auto response = std::make_unique<NStat::TEvStatistics::TEvStatisticsResponse>();
auto& record = response->Record;
record.SetShardTabletId(TabletID());

record.SetStatus(NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS);

std::unique_ptr<TCountMinSketch> sketch(TCountMinSketch::Create());
ui32 value = 1;
sketch->Count((const char*)&value, sizeof(value));
TString strSketch(sketch->AsStringBuf());

auto* column = record.AddColumns();
column->SetTag(1);
for (ui32 columnTag : columnTagsRequested) {
auto* column = record.AddColumns();
column->SetTag(columnTag);

auto* statistic = column->AddStatistics();
statistic->SetType(NStat::COUNT_MIN_SKETCH);
statistic->SetData(std::move(strSketch));
auto* statistic = column->AddStatistics();
statistic->SetType(NStat::COUNT_MIN_SKETCH);
statistic->SetData(TString(sketchesByColumns[columnTag]->AsStringBuf()));
}

Send(ev->Sender, response.release(), 0, ev->Cookie);
}
Expand Down
Loading

0 comments on commit 13e0649

Please sign in to comment.