Skip to content

Commit

Permalink
count-min-sketch as index (#7978)
Browse files Browse the repository at this point in the history
  • Loading branch information
ildar-khisambeev authored Aug 23, 2024
1 parent 48bd4dd commit 78bb468
Show file tree
Hide file tree
Showing 25 changed files with 610 additions and 56 deletions.
4 changes: 4 additions & 0 deletions ydb/core/formats/arrow/protos/ssa.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ message TProgram {
repeated uint64 HashValues = 1;
}

message TCountMinSketchChecker {
}

message TOlapIndexChecker {
optional uint32 IndexId = 1;
optional string ClassName = 2;
Expand All @@ -56,6 +59,7 @@ message TProgram {
oneof Implementation {
TBloomFilterChecker BloomFilter = 40;
TCompositeChecker Composite = 41;
TCountMinSketchChecker CountMinSketch = 42;
}
}

Expand Down
141 changes: 133 additions & 8 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>

#include <ydb/core/statistics/events.h>

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NKqp {
Expand Down Expand Up @@ -59,7 +61,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -68,7 +70,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -105,6 +107,129 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
}

Y_UNIT_TEST(CountMinSketchIndex) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

Tests::NCommon::TLoggerInit(kikimr).SetComponents({NKikimrServices::TX_COLUMNSHARD}, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_ts, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['timestamp']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_res_id, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['resource_id']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_uid, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['uid']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_level, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['level']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_message, TYPE=COUNT_MIN_SKETCH,
FEATURES=`{"column_names" : ['message']}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);

csController->WaitActualization(TDuration::Seconds(10));
{
auto runtime = kikimr.GetTestServer().GetRuntime();
auto sender = runtime->AllocateEdgeActor();

TAutoPtr<IEventHandle> handle;

size_t shard = 0;
std::set<ui64> pathids;
for (auto&& i : csController->GetShardActualIds()) {
Cerr << ">>> shard actual id: " << i << Endl;
for (auto&& j : csController->GetPathIds(i)) {
Cerr << ">>> path id: " << j << Endl;
pathids.insert(j);
}
if (++shard == 3)
break;
}

UNIT_ASSERT(pathids.size() == 1);
ui64 pathId = *pathids.begin();

shard = 0;
for (auto&& i : csController->GetShardActualIds()) {
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
request->Record.MutableTable()->MutablePathId()->SetLocalId(pathId);

runtime->Send(MakePipePerNodeCacheID(false), sender, new TEvPipeCache::TEvForward(
request.release(), i, false));
if (++shard == 3)
break;
}

auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::Create());
for (size_t shard = 0; shard < 3; ++shard) {
auto event = runtime->GrabEdgeEvent<NStat::TEvStatistics::TEvStatisticsResponse>(handle);
UNIT_ASSERT(event);

auto& response = event->Record;
// Cerr << response << Endl;
UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS);
UNIT_ASSERT(response.ColumnsSize() == 5);
TString someData = response.GetColumns(0).GetStatistics(0).GetData();
*sketch += *std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(someData.data(), someData.size()));
Cerr << ">>> sketch.GetElementCount() = " << sketch->GetElementCount() << Endl;
UNIT_ASSERT(sketch->GetElementCount() > 0);
}
}
}

Y_UNIT_TEST(SchemeActualizationOnceOnStart) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
Expand Down Expand Up @@ -194,7 +319,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() << Sprintf(
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
)", StorageId.data());
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -203,7 +328,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
{
auto alterQuery = TStringBuilder() << Sprintf(
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
)", StorageId.data()
);
Expand Down Expand Up @@ -347,7 +472,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -357,7 +482,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid", "resource_id"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -367,7 +492,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.005}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand All @@ -377,7 +502,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.01}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,11 @@ message TRequestedMaxIndex {
optional string ColumnName = 1;
}

message TRequestedCountMinSketch {
// sketch built on the combined data from the set of columns
repeated string ColumnNames = 1;
}

message TOlapIndexRequested {
optional string Name = 1;
optional TCompressionOptions Compression = 3;
Expand All @@ -491,6 +496,7 @@ message TOlapIndexRequested {
oneof Implementation {
TRequestedBloomFilter BloomFilter = 40;
TRequestedMaxIndex MaxIndex = 41;
TRequestedCountMinSketch CountMinSketch = 42;
}
}

Expand All @@ -504,6 +510,10 @@ message TMaxIndex {
optional uint32 ColumnId = 1;
}

message TCountMinSketch {
repeated uint32 ColumnIds = 1;
}

message TOlapIndexDescription {
// This id is auto-generated by schemeshard
optional uint32 Id = 1;
Expand All @@ -517,6 +527,7 @@ message TOlapIndexDescription {
oneof Implementation {
TBloomFilter BloomFilter = 41;
TMaxIndex MaxIndex = 42;
TCountMinSketch CountMinSketch = 43;
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/out/out.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrDataEvents::TEvWrite::ETxMode, stream, value) {
Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvAnalyzeStatusResponse_EStatus, stream, value) {
stream << NKikimrStat::TEvAnalyzeStatusResponse_EStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value) {
stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
}
Loading

0 comments on commit 78bb468

Please sign in to comment.