diff --git a/ydb/core/formats/arrow/protos/ssa.proto b/ydb/core/formats/arrow/protos/ssa.proto index 5ffbf067b33d..193c759a3a80 100644 --- a/ydb/core/formats/arrow/protos/ssa.proto +++ b/ydb/core/formats/arrow/protos/ssa.proto @@ -45,6 +45,9 @@ message TProgram { repeated uint64 HashValues = 1; } + message TCountMinSketchChecker { + } + message TOlapIndexChecker { optional uint32 IndexId = 1; optional string ClassName = 2; @@ -56,6 +59,7 @@ message TProgram { oneof Implementation { TBloomFilterChecker BloomFilter = 40; TCompositeChecker Composite = 41; + TCountMinSketchChecker CountMinSketch = 42; } } diff --git a/ydb/core/kqp/ut/olap/indexes_ut.cpp b/ydb/core/kqp/ut/olap/indexes_ut.cpp index 7699cd8d8b6e..13e98b57e9b6 100644 --- a/ydb/core/kqp/ut/olap/indexes_ut.cpp +++ b/ydb/core/kqp/ut/olap/indexes_ut.cpp @@ -6,6 +6,8 @@ #include #include +#include + #include namespace NKikimr::NKqp { @@ -59,7 +61,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { { auto alterQuery = TStringBuilder() << - R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -68,7 +70,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { } { auto alterQuery = TStringBuilder() << - R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -105,6 +107,129 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { } } + Y_UNIT_TEST(CountMinSketchIndex) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); + csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30); + + TLocalHelper(kikimr).CreateTestOlapTable(); + auto tableClient = kikimr.GetTableClient(); + + Tests::NCommon::TLoggerInit(kikimr).SetComponents({NKikimrServices::TX_COLUMNSHARD}, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize(); + + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_ts, TYPE=COUNT_MIN_SKETCH, + FEATURES=`{"column_names" : ['timestamp']}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_res_id, TYPE=COUNT_MIN_SKETCH, + FEATURES=`{"column_names" : ['resource_id']}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_uid, TYPE=COUNT_MIN_SKETCH, + FEATURES=`{"column_names" : ['uid']}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_level, TYPE=COUNT_MIN_SKETCH, + FEATURES=`{"column_names" : ['level']}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=cms_message, TYPE=COUNT_MIN_SKETCH, + FEATURES=`{"column_names" : ['message']}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000); + + csController->WaitActualization(TDuration::Seconds(10)); + { + auto runtime = kikimr.GetTestServer().GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + + TAutoPtr handle; + + size_t shard = 0; + std::set pathids; + for (auto&& i : csController->GetShardActualIds()) { + Cerr << ">>> shard actual id: " << i << Endl; + for (auto&& j : csController->GetPathIds(i)) { + Cerr << ">>> path id: " << j << Endl; + pathids.insert(j); + } + if (++shard == 3) + break; + } + + UNIT_ASSERT(pathids.size() == 1); + ui64 pathId = *pathids.begin(); + + shard = 0; + for (auto&& i : csController->GetShardActualIds()) { + auto request = std::make_unique(); + request->Record.MutableTable()->MutablePathId()->SetLocalId(pathId); + + runtime->Send(MakePipePerNodeCacheID(false), sender, new TEvPipeCache::TEvForward( + request.release(), i, false)); + if (++shard == 3) + break; + } + + auto sketch = std::unique_ptr(TCountMinSketch::Create()); + for (size_t shard = 0; shard < 3; ++shard) { + auto event = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(event); + + auto& response = event->Record; + // Cerr << response << Endl; + UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS); + UNIT_ASSERT(response.ColumnsSize() == 5); + TString someData = response.GetColumns(0).GetStatistics(0).GetData(); + *sketch += *std::unique_ptr(TCountMinSketch::FromString(someData.data(), someData.size())); + Cerr << ">>> sketch.GetElementCount() = " << sketch->GetElementCount() << Endl; + UNIT_ASSERT(sketch->GetElementCount() > 0); + } + } + } + Y_UNIT_TEST(SchemeActualizationOnceOnStart) { auto settings = TKikimrSettings() .SetWithSampleTables(false); @@ -194,7 +319,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { { auto alterQuery = TStringBuilder() << Sprintf( - R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`); )", StorageId.data()); auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -203,7 +328,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { } { auto alterQuery = TStringBuilder() << Sprintf( - R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`); )", StorageId.data() ); @@ -347,7 +472,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { { auto alterQuery = TStringBuilder() << - R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -357,7 +482,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { { auto alterQuery = TStringBuilder() << - R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["uid", "resource_id"], "false_positive_probability" : 0.05}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -367,7 +492,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { { auto alterQuery = TStringBuilder() << - R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.005}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -377,7 +502,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { { auto alterQuery = TStringBuilder() << - R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.01}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 08783b3c5370..6a0d8bff5b10 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -482,6 +482,11 @@ message TRequestedMaxIndex { optional string ColumnName = 1; } +message TRequestedCountMinSketch { + // sketch built on the combined data from the set of columns + repeated string ColumnNames = 1; +} + message TOlapIndexRequested { optional string Name = 1; optional TCompressionOptions Compression = 3; @@ -491,6 +496,7 @@ message TOlapIndexRequested { oneof Implementation { TRequestedBloomFilter BloomFilter = 40; TRequestedMaxIndex MaxIndex = 41; + TRequestedCountMinSketch CountMinSketch = 42; } } @@ -504,6 +510,10 @@ message TMaxIndex { optional uint32 ColumnId = 1; } +message TCountMinSketch { + repeated uint32 ColumnIds = 1; +} + message TOlapIndexDescription { // This id is auto-generated by schemeshard optional uint32 Id = 1; @@ -517,6 +527,7 @@ message TOlapIndexDescription { oneof Implementation { TBloomFilter BloomFilter = 41; TMaxIndex MaxIndex = 42; + TCountMinSketch CountMinSketch = 43; } } diff --git a/ydb/core/protos/out/out.cpp b/ydb/core/protos/out/out.cpp index 735ac9b34a44..f85c5b9c0256 100644 --- a/ydb/core/protos/out/out.cpp +++ b/ydb/core/protos/out/out.cpp @@ -243,3 +243,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrDataEvents::TEvWrite::ETxMode, stream, value) { Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvAnalyzeStatusResponse_EStatus, stream, value) { stream << NKikimrStat::TEvAnalyzeStatusResponse_EStatus_Name(value); } + +Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value) { + stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value); +} diff --git a/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp index e8fb802d895c..344707d37d9a 100644 --- a/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp @@ -11,6 +11,24 @@ namespace NKikimr { namespace NStat { +// TODO: check for arbitrary set of values of type T (including frequent duplicates) +// numbers (1..N) were count as a sketch. Check sketch properties +bool CheckCountMinSketch(const std::shared_ptr& sketch, const ui32 N) { + UNIT_ASSERT(sketch->GetElementCount() == N); + const double eps = 1. / sketch->GetWidth(); + const double delta = 1. / (1 << sketch->GetDepth()); + size_t failedEstimatesCount = 0; + for (ui32 i = 0; i < N; ++i) { + const ui32 trueCount = 1; // true count of value i + auto probe = sketch->Probe((const char *)&i, sizeof(i)); + if (probe > trueCount + eps * N) { + failedEstimatesCount++; + } + } + Cerr << ">>> failedEstimatesCount = " << failedEstimatesCount << Endl; + return failedEstimatesCount < delta * N; +} + Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTable) { @@ -20,7 +38,9 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { runtime.SimulateSleep(TDuration::Seconds(30)); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10); + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + + UNIT_ASSERT(CheckCountMinSketch(countMin, 1000000)); } Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeResolve) { @@ -44,7 +64,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { runtime.SimulateSleep(TDuration::Seconds(10)); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10); + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + UNIT_ASSERT(CheckCountMinSketch(countMin, 1000000)); } Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeReqDistribution) { @@ -63,7 +84,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { observer.Remove(); RebootTablet(runtime, tableInfo.SaTabletId, sender); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10); + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + UNIT_ASSERT(CheckCountMinSketch(countMin, 1000000)); } Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeAggregate) { @@ -82,7 +104,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { observer.Remove(); RebootTablet(runtime, tableInfo.SaTabletId, sender); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10); + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + UNIT_ASSERT(CheckCountMinSketch(countMin, 1000000)); } Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeSave) { @@ -101,7 +124,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { observer.Remove(); RebootTablet(runtime, tableInfo.SaTabletId, sender); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10); + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + UNIT_ASSERT(CheckCountMinSketch(countMin, 1000000)); } Y_UNIT_TEST(TraverseColumnTableRebootSaTabletInAggregate) { @@ -121,7 +145,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { observer.Remove(); RebootTablet(runtime, tableInfo.SaTabletId, sender); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10); + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + UNIT_ASSERT(CheckCountMinSketch(countMin, 1000000)); } Y_UNIT_TEST(TraverseColumnTableHiveDistributionZeroNodes) { @@ -165,7 +190,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { runtime.SimulateSleep(TDuration::Seconds(30)); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10); + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + UNIT_ASSERT(CheckCountMinSketch(countMin, 1000000)); } Y_UNIT_TEST(TraverseColumnTableHiveDistributionAbsentNodes) { @@ -201,7 +227,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { runtime.SimulateSleep(TDuration::Seconds(30)); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 10); + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + UNIT_ASSERT(CheckCountMinSketch(countMin, 1000000)); } Y_UNIT_TEST(TraverseColumnTableAggrStatUnavailableNode) { @@ -232,7 +259,13 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { runtime.SimulateSleep(TDuration::Seconds(30)); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 11); // 10 for first round, 1 for second + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + + ui32 value = 1; + auto probe = countMin->Probe((const char *)&value, sizeof(value)); + Cerr << "probe = " << probe << Endl; + const double eps = 1. / countMin->GetWidth(); + UNIT_ASSERT(probe <= 1 + eps * 1100000); // 10 for first round, 1 for second } Y_UNIT_TEST(TraverseColumnTableAggrStatNonLocalTablet) { @@ -263,7 +296,13 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { runtime.SimulateSleep(TDuration::Seconds(60)); - ValidateCountMinColumnshard(runtime, tableInfo.PathId, 11); // 10 for first round, 1 for second + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + + ui32 value = 1; + auto probe = countMin->Probe((const char *)&value, sizeof(value)); + Cerr << "probe = " << probe << Endl; + const double eps = 1. / countMin->GetWidth(); + UNIT_ASSERT(probe <= 1 + eps * 1100000); // 10 for first round, 1 for second } } diff --git a/ydb/core/statistics/service/service_impl.cpp b/ydb/core/statistics/service/service_impl.cpp index 9ca127fa3a70..c81bb8f61704 100644 --- a/ydb/core/statistics/service/service_impl.cpp +++ b/ydb/core/statistics/service/service_impl.cpp @@ -897,7 +897,7 @@ class TStatService : public TActorBootstrapped { void SendStatisticsRequest(const TActorId& clientId) { auto request = std::make_unique(); auto& record = request->Record; - record.MutableTypes()->Add(NKikimr::NStat::COUNT_MIN_SKETCH); + record.MutableTypes()->Add(NKikimrStat::TYPE_COUNT_MIN_SKETCH); auto* path = record.MutableTable()->MutablePathId(); path->SetOwnerId(AggregationStatistics.PathId.OwnerId); diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index ae92cb1885fc..39a55920ad0c 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -238,21 +238,35 @@ void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TS )", fullTableName.c_str(), shardCount)).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - NYdb::TValueBuilder rows; - rows.BeginList(); - for (size_t i = 0; i < 100; ++i) { - auto key = TValueBuilder().Uint64(i).Build(); - auto value = TValueBuilder().OptionalUint64(i).Build(); - rows.AddListItem(); - rows.BeginStruct(); - rows.AddMember("Key", key); - rows.AddMember("Value", value); - rows.EndStruct(); - } - rows.EndList(); + result = session.ExecuteSchemeQuery(Sprintf(R"( + ALTER OBJECT `%s` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH, + FEATURES=`{"column_names" : ['Key']}`); + )", fullTableName.c_str())).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - result = client.BulkUpsert(fullTableName, rows.Build()).GetValueSync(); + result = session.ExecuteSchemeQuery(Sprintf(R"( + ALTER OBJECT `%s` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH, + FEATURES=`{"column_names" : ['Value']}`); + )", fullTableName.c_str())).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + for (size_t bulk = 0; bulk < 5; ++bulk) { + NYdb::TValueBuilder rows; + rows.BeginList(); + for (size_t i = 0; i < 1000000; ++i) { + auto key = TValueBuilder().Uint64(i).Build(); + auto value = TValueBuilder().OptionalUint64(i).Build(); + rows.AddListItem(); + rows.BeginStruct(); + rows.AddMember("Key", key); + rows.AddMember("Value", value); + rows.EndStruct(); + } + rows.EndList(); + + result = client.BulkUpsert(fullTableName, rows.Build()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } } std::vector CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { @@ -267,7 +281,7 @@ std::vector CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount auto& runtime = *env.GetServer().GetRuntime(); auto sender = runtime.AllocateEdgeActor(); - runtime.SimulateSleep(TDuration::Seconds(10)); + runtime.SimulateSleep(TDuration::Seconds(30)); initThread.join(); std::vector ret; diff --git a/ydb/core/tx/columnshard/columnshard__statistics.cpp b/ydb/core/tx/columnshard/columnshard__statistics.cpp index 86fbb2482cf8..84b1a89982b8 100644 --- a/ydb/core/tx/columnshard/columnshard__statistics.cpp +++ b/ydb/core/tx/columnshard/columnshard__statistics.cpp @@ -1,7 +1,12 @@ #include "columnshard.h" #include "columnshard_impl.h" +#include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h" #include +#include + +#include + namespace NKikimr::NColumnShard { @@ -22,23 +27,73 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const } void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, const TActorContext&) { + const auto& record = ev->Get()->Record; + auto response = std::make_unique(); - auto& record = response->Record; - record.SetShardTabletId(TabletID()); + auto& respRecord = response->Record; + respRecord.SetShardTabletId(TabletID()); + + if (record.TypesSize() > 0 && (record.TypesSize() > 1 || record.GetTypes(0) != NKikimrStat::TYPE_COUNT_MIN_SKETCH)) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "Unsupported statistic type in statistics request"); + + respRecord.SetStatus(NKikimrStat::TEvStatisticsResponse::STATUS_ERROR); + + Send(ev->Sender, response.release(), 0, ev->Cookie); + return; + } + + AFL_VERIFY(HasIndex()); + auto index = GetIndexAs(); + auto spg = index.GetGranuleOptional(record.GetTable().GetPathId().GetLocalId()); + AFL_VERIFY(spg); + + std::set columnTagsRequested; + for (ui32 tag : record.GetTable().GetColumnTags()) { + columnTagsRequested.insert(tag); + } + if (columnTagsRequested.empty()) { + auto schema = index.GetVersionedIndex().GetLastSchema(); + auto allColumnIds = schema->GetIndexInfo().GetColumnIds(false); + columnTagsRequested = std::set(allColumnIds.begin(), allColumnIds.end()); + } + + std::map> sketchesByColumns; + for (auto id : columnTagsRequested) { + sketchesByColumns.emplace(id, TCountMinSketch::Create()); + } + + for (const auto& [_, portionInfo] : spg->GetPortions()) { + if (portionInfo->IsVisible(GetMaxReadVersion())) { + std::shared_ptr portionSchema = portionInfo->GetSchema(index.GetVersionedIndex()); + for (ui32 columnId : columnTagsRequested) { + auto indexMeta = portionSchema->GetIndexInfo().GetIndexMetaCountMinSketch({columnId}); + + if (!indexMeta) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "Missing countMinSketch index for columnId " + ToString(columnId)); + continue; + } + AFL_VERIFY(indexMeta->GetColumnIds().size() == 1); + + const std::vector data = portionInfo->GetIndexInplaceDataVerified(indexMeta->GetIndexId()); - record.SetStatus(NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS); + for (const auto& sketchAsString : data) { + auto sketch = std::unique_ptr(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size())); + *sketchesByColumns[columnId] += *sketch; + } + } + } + } - std::unique_ptr sketch(TCountMinSketch::Create()); - ui32 value = 1; - sketch->Count((const char*)&value, sizeof(value)); - TString strSketch(sketch->AsStringBuf()); + respRecord.SetStatus(NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS); - auto* column = record.AddColumns(); - column->SetTag(1); + for (ui32 columnTag : columnTagsRequested) { + auto* column = respRecord.AddColumns(); + column->SetTag(columnTag); - auto* statistic = column->AddStatistics(); - statistic->SetType(NStat::COUNT_MIN_SKETCH); - statistic->SetData(std::move(strSketch)); + auto* statistic = column->AddStatistics(); + statistic->SetType(NStat::COUNT_MIN_SKETCH); + statistic->SetData(TString(sketchesByColumns[columnTag]->AsStringBuf())); + } Send(ev->Sender, response.release(), 0, ev->Cookie); } diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 1e4bbab0d589..f0ff74084dab 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -336,7 +337,7 @@ void TIndexInfo::InitializeCaches(const std::shared_ptr& opera for (auto&& c : Columns) { AFL_VERIFY(ArrowColumnByColumnIdCache.emplace(c.first, GetColumnFieldVerified(c.first)).second); - AFL_VERIFY(ColumnFeatures.emplace(c.first, TColumnFeatures(c.first, GetColumnFieldVerified(c.first), DefaultSerializer, operators->GetDefaultOperator(), + AFL_VERIFY(ColumnFeatures.emplace(c.first, TColumnFeatures(c.first, GetColumnFieldVerified(c.first), DefaultSerializer, operators->GetDefaultOperator(), NArrow::IsPrimitiveYqlType(c.second.PType), c.first == GetPKFirstColumnId(), nullptr)).second); } for (auto&& cId : GetSystemColumnIds()) { @@ -395,7 +396,7 @@ NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap TIndexInfo::GetIndexMax(const ui32 columnId) const { +std::shared_ptr TIndexInfo::GetIndexMetaMax(const ui32 columnId) const { for (auto&& i : Indexes) { if (i.second->GetClassName() != NIndexes::NMax::TIndexMeta::GetClassNameStatic()) { continue; @@ -408,6 +409,19 @@ std::shared_ptr TIndexInfo::GetIndexMax(const ui32 c return nullptr; } +std::shared_ptr TIndexInfo::GetIndexMetaCountMinSketch(const std::set& columnIds) const { + for (auto&& i : Indexes) { + if (i.second->GetClassName() != NIndexes::NCountMinSketch::TIndexMeta::GetClassNameStatic()) { + continue; + } + auto index = static_pointer_cast(i.second.GetObjectPtr()); + if (index->GetColumnIds() == columnIds) { + return index; + } + } + return nullptr; +} + std::vector TIndexInfo::GetEntityIds() const { auto result = GetColumnIds(true); for (auto&& i : Indexes) { diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index be70ac1ae310..b2be86d1e964 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -28,6 +28,10 @@ namespace NIndexes::NMax { class TIndexMeta; } +namespace NIndexes::NCountMinSketch { +class TIndexMeta; +} + namespace NStorageOptimizer { class IOptimizerPlannerConstructor; } @@ -224,7 +228,8 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema, public IIndexInfo { return result; } - std::shared_ptr GetIndexMax(const ui32 columnId) const; + std::shared_ptr GetIndexMetaMax(const ui32 columnId) const; + std::shared_ptr GetIndexMetaCountMinSketch(const std::set& columnIds) const; [[nodiscard]] TConclusionStatus AppendIndex(const THashMap>>& originalData, const ui32 indexId, const std::shared_ptr& operators, TSecondaryData& result) const; diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp index 55882102eee9..f664eb6afb6a 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp @@ -26,7 +26,7 @@ std::optional TTieringActualizer::Bu if (Tiering) { AFL_VERIFY(TieringColumnId); - auto indexMeta = portionSchema->GetIndexInfo().GetIndexMax(*TieringColumnId); + auto indexMeta = portionSchema->GetIndexInfo().GetIndexMetaMax(*TieringColumnId); std::shared_ptr max; if (!indexMeta) { max = portion.MaxValue(*TieringColumnId); diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp index 19d283d7f9e5..fc89fb8b8ada 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp @@ -66,4 +66,4 @@ void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr +#include +#include +#include + +namespace NKikimr::NOlap::NIndexes::NCountMinSketch { + +void TCountMinSketchChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const { + proto.MutableCountMinSketch(); +} + +bool TCountMinSketchChecker::DoCheckImpl(const std::vector& blobs) const { + Y_UNUSED(blobs); + return true; +} + +bool TCountMinSketchChecker::DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) { + return proto.HasCountMinSketch(); +} + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/checker.h b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/checker.h new file mode 100644 index 000000000000..ffa073b2e400 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/checker.h @@ -0,0 +1,32 @@ +#pragma once +#include + +namespace NKikimr::NOlap::NIndexes::NCountMinSketch { + +class TCountMinSketchChecker: public TSimpleIndexChecker { +public: + static TString GetClassNameStatic() { + return "COUNT_MIN_SKETCH"; + } +private: + using TBase = TSimpleIndexChecker; + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +protected: + virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override; + virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override; + + virtual bool DoCheckImpl(const std::vector& blobs) const override; + +public: + TCountMinSketchChecker() = default; + TCountMinSketchChecker(const ui32 indexId) + : TBase(indexId) + {} + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp new file mode 100644 index 000000000000..7dcbaa9db476 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp @@ -0,0 +1,63 @@ +#include "constructor.h" +#include "meta.h" + +#include + +namespace NKikimr::NOlap::NIndexes::NCountMinSketch { + +std::shared_ptr TCountMinSketchConstructor::DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { + std::set columnIds; + if (ColumnNames.empty()) { + for (const auto& [id, _] : currentSchema.GetColumns().GetColumns()) { + AFL_VERIFY(columnIds.emplace(id).second); + } + } + for (auto&& i : ColumnNames) { + auto* columnInfo = currentSchema.GetColumns().GetByName(i); + if (!columnInfo) { + errors.AddError("no column with name " + i); + return nullptr; + } + AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second); + } + return std::make_shared(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::LocalMetadataStorageId), columnIds); +} + +NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + if (!jsonInfo.Has("column_names")) { + return TConclusionStatus::Fail("column_names have to be in count min sketch features"); + } + const NJson::TJsonValue::TArray* columnNamesArray; + if (!jsonInfo["column_names"].GetArrayPointer(&columnNamesArray)) { + return TConclusionStatus::Fail("column_names have to be in count min sketch features as array ['column_name_1', ... , 'column_name_N']"); + } + for (auto&& i : *columnNamesArray) { + if (!i.IsString()) { + return TConclusionStatus::Fail("column_names have to be in count min sketch features as array of strings ['column_name_1', ... , 'column_name_N']"); + } + ColumnNames.emplace(i.GetString()); + } + return TConclusionStatus::Success(); +} + +NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) { + if (!proto.HasCountMinSketch()) { + const TString errorMessage = "not found CountMinSketch section in proto: \"" + proto.DebugString() + "\""; + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage); + return TConclusionStatus::Fail(errorMessage); + } + auto& sketch = proto.GetCountMinSketch(); + for (auto&& i : sketch.GetColumnNames()) { + ColumnNames.emplace(i); + } + return TConclusionStatus::Success(); +} + +void TCountMinSketchConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const { + auto* sketchProto = proto.MutableCountMinSketch(); + for (auto&& i : ColumnNames) { + sketchProto->AddColumnNames(i); + } +} + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.h new file mode 100644 index 000000000000..86d7e34fa577 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.h @@ -0,0 +1,31 @@ +#pragma once +#include + +namespace NKikimr::NOlap::NIndexes::NCountMinSketch { + +class TCountMinSketchConstructor: public IIndexMetaConstructor { +public: + static TString GetClassNameStatic() { + return "COUNT_MIN_SKETCH"; + } +private: + std::set ColumnNames; + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +protected: + virtual std::shared_ptr DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override; + + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; + + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override; + +public: + TCountMinSketchConstructor() = default; + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp new file mode 100644 index 000000000000..5e0465848b29 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp @@ -0,0 +1,56 @@ +#include "meta.h" +#include "checker.h" +#include +#include +#include +#include +#include + +#include +#include + +namespace NKikimr::NOlap::NIndexes::NCountMinSketch { + +TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const { + auto sketch = std::unique_ptr(TCountMinSketch::Create()); + + for (auto& colReader : reader) { + for (colReader.Start(); colReader.IsCorrect(); colReader.ReadNextChunk()) { + auto array = colReader.GetCurrentChunk(); + + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t; + using TArray = typename arrow::TypeTraits::ArrayType; + + const TArray& arrTyped = static_cast(*array); + if constexpr (arrow::has_c_type()) { + for (int64_t i = 0; i < arrTyped.length(); ++i) { + auto cell = TCell::Make(arrTyped.Value(i)); + sketch->Count(cell.Data(), cell.Size()); + } + return true; + } + if constexpr (arrow::has_string_view()) { + for (int64_t i = 0; i < arrTyped.length(); ++i) { + auto view = arrTyped.GetView(i); + sketch->Count(view.data(), view.size()); + } + return true; + } + AFL_VERIFY(false)("message", "Unsupported arrow type for building an index"); + return false; + }); + } + } + + TString result(sketch->AsStringBuf()); + return result; +} + +void TIndexMeta::DoFillIndexCheckers(const std::shared_ptr& info, const NSchemeShard::TOlapSchema& /*schema*/) const { + for (auto&& branch : info->GetBranches()) { + branch->MutableIndexes().emplace_back(std::make_shared(GetIndexId())); + } +} + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h new file mode 100644 index 000000000000..2c23af1fefdb --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h @@ -0,0 +1,63 @@ +#pragma once +#include + +namespace NKikimr::NOlap::NIndexes::NCountMinSketch { + +class TIndexMeta: public TIndexByColumns { +public: + static TString GetClassNameStatic() { + return "COUNT_MIN_SKETCH"; + } + +private: + using TBase = TIndexByColumns; + + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +protected: + virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override { + const auto* bMeta = dynamic_cast(&newMeta); + if (!bMeta) { + return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); + } + return TBase::CheckSameColumnsForModification(newMeta); + } + + virtual void DoFillIndexCheckers(const std::shared_ptr& info, const NSchemeShard::TOlapSchema& schema) const override; + + virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const override; + + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override { + AFL_VERIFY(TBase::DoDeserializeFromProto(proto)); + AFL_VERIFY(proto.HasCountMinSketch()); + auto& sketch = proto.GetCountMinSketch(); + for (auto&& i : sketch.GetColumnIds()) { + ColumnIds.emplace(i); + } + return true; + } + + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override { + auto* sketchProto = proto.MutableCountMinSketch(); + for (auto&& i : ColumnIds) { + sketchProto->AddColumnIds(i); + } + } + +public: + TIndexMeta() = default; + TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const std::set& columnIds) + : TBase(indexId, indexName, columnIds, storageId) { + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + const std::set& GetColumnIds() const { + return ColumnIds; + } + +}; + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/ya.make new file mode 100644 index 000000000000..bcba53e477ae --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + GLOBAL constructor.cpp + GLOBAL meta.cpp + GLOBAL checker.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/storage/indexes/portions +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp index e62bc99d0a7f..3f8634cac619 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp @@ -53,4 +53,4 @@ NKikimr::TConclusionStatus TIndexByColumns::CheckSameColumnsForModification(cons return TConclusionStatus::Success(); } -} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h index 427ee98d99d2..5356d5c4302d 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h @@ -9,8 +9,10 @@ class TIndexByColumns: public IIndexMeta { private: using TBase = IIndexMeta; std::shared_ptr Serializer; + protected: std::set ColumnIds; + virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const = 0; virtual std::shared_ptr DoBuildIndex(const THashMap>>& data, const TIndexInfo& indexInfo) const override final; @@ -23,4 +25,4 @@ class TIndexByColumns: public IIndexMeta { TIndexByColumns(const ui32 indexId, const TString& indexName, const std::set& columnIds, const TString& storageId); }; -} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/ya.make index 2edfa9332cd4..0459c906d836 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/indexes/ya.make @@ -4,6 +4,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/storage/indexes/portions ydb/core/tx/columnshard/engines/storage/indexes/bloom ydb/core/tx/columnshard/engines/storage/indexes/max + ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch ) END() diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index e86806da299a..021ea7e47a4f 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -61,7 +61,7 @@ class TChunkedColumnReader { ui32 CurrentRecordIndex = 0; public: TChunkedColumnReader(const std::vector>& chunks, const std::shared_ptr& loader) - : Chunks(chunks) + : Chunks(chunks) , Loader(loader) { Start(); diff --git a/ydb/library/minsketch/count_min_sketch.h b/ydb/library/minsketch/count_min_sketch.h index c64b9250dd48..4b8866d3d921 100644 --- a/ydb/library/minsketch/count_min_sketch.h +++ b/ydb/library/minsketch/count_min_sketch.h @@ -3,8 +3,6 @@ #include #include -#include - namespace NKikimr { class TCountMinSketch {