From 0657aba590940ebf1f755d585b8fd85e35626d84 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 31 Dec 2024 13:13:32 +0300 Subject: [PATCH] fix blob range construction and index control (#13131) --- ydb/core/kqp/ut/olap/indexes_ut.cpp | 5 +- ydb/core/kqp/ut/olap/sys_view_ut.cpp | 2 + ydb/core/protos/flat_scheme_op.proto | 2 + .../blobs_action/abstract/storage.cpp | 6 +++ .../blobs_action/abstract/storage.h | 26 ++++++----- .../columnshard/blobs_action/local/storage.h | 2 +- .../columnshard/engines/scheme/index_info.cpp | 3 +- .../scheme/versions/abstract_scheme.cpp | 8 ++-- .../engines/storage/indexes/bloom/checker.h | 22 ++++++++- .../storage/indexes/bloom_ngramm/checker.cpp | 3 ++ .../storage/indexes/bloom_ngramm/const.cpp | 4 ++ .../storage/indexes/bloom_ngramm/const.h | 7 +++ .../indexes/bloom_ngramm/constructor.cpp | 15 +++++- .../indexes/bloom_ngramm/constructor.h | 1 + .../storage/indexes/bloom_ngramm/meta.cpp | 27 +++++++---- .../storage/indexes/bloom_ngramm/meta.h | 46 ++++++++++++++----- .../columnshard/engines/ut/ut_logs_engine.cpp | 2 +- .../engines/writer/indexed_blob_constructor.h | 3 +- .../tx/columnshard/hooks/abstract/abstract.h | 9 ++++ .../tx/columnshard/hooks/testing/controller.h | 11 ++++- .../operations/slice_builder/builder.cpp | 7 +-- ydb/core/tx/columnshard/splitter/settings.h | 4 ++ .../ut_rw/ut_columnshard_read_write.cpp | 2 + 23 files changed, 169 insertions(+), 48 deletions(-) diff --git a/ydb/core/kqp/ut/olap/indexes_ut.cpp b/ydb/core/kqp/ut/olap/indexes_ut.cpp index 31495e1a1ac5..be14ae5ec703 100644 --- a/ydb/core/kqp/ut/olap/indexes_ut.cpp +++ b/ydb/core/kqp/ut/olap/indexes_ut.cpp @@ -23,6 +23,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30); csController->SetOverrideMemoryLimitForPortionReading(1e+10); + csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings()); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -103,6 +104,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30); + csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings()); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -347,6 +349,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30); csController->SetOverrideMemoryLimitForPortionReading(1e+10); + csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings()); TLocalHelper(*Kikimr).CreateTestOlapTable(); auto tableClient = Kikimr->GetTableClient(); @@ -367,7 +370,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { auto alterQuery = TStringBuilder() << Sprintf( R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_uid, TYPE=BLOOM_NGRAMM_FILTER, - FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 64024}`); + FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 512, "records_count" : 1024}`); )", StorageId.data()); auto session = tableClient.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp index 6ad4d8b3def9..9d12b54efab6 100644 --- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp +++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp @@ -115,6 +115,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 bytesPK1; { auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings()); auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); @@ -127,6 +128,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { } auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings()); ui64 rawBytesUnpack1PK = 0; ui64 bytesUnpack1PK = 0; ui64 rawBytesPackAndUnpack2PK; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 541057c4e360..289822c5e4b3 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -474,6 +474,7 @@ message TRequestedBloomNGrammFilter { optional uint32 FilterSizeBytes = 2; optional uint32 HashesCount = 3; optional string ColumnName = 4; + optional uint32 RecordsCount = 5; } message TRequestedMaxIndex { @@ -510,6 +511,7 @@ message TBloomNGrammFilter { optional uint32 FilterSizeBytes = 2; optional uint32 HashesCount = 3; optional uint32 ColumnId = 4; + optional uint32 RecordsCount = 5; } message TMaxIndex { diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/storage.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/storage.cpp index e24dd299a23b..19d7bfb03156 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/storage.cpp +++ b/ydb/core/tx/columnshard/blobs_action/abstract/storage.cpp @@ -1,5 +1,7 @@ #include "storage.h" +#include + namespace NKikimr::NOlap { bool TCommonBlobsTracker::IsBlobInUsage(const NOlap::TUnifiedBlobId& blobId) const { @@ -42,4 +44,8 @@ void IBlobsStorageOperator::Stop() { Stopped = true; } +const NSplitter::TSplitSettings& IBlobsStorageOperator::GetBlobSplitSettings() const { + return NYDBTest::TControllers::GetColumnShardController()->GetBlobSplitSettings(DoGetBlobSplitSettings()); } + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h index 6263e66f1515..3497599c1ed2 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h +++ b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h @@ -1,16 +1,16 @@ #pragma once +#include "gc.h" +#include "read.h" #include "remove.h" #include "write.h" -#include "read.h" -#include "gc.h" #include -#include #include +#include #include +#include #include -#include namespace NKikimr::NOlap { @@ -18,9 +18,11 @@ class TCommonBlobsTracker: public IBlobInUseTracker { private: // List of blobs that are used by in-flight requests THashMap BlobsUseCount; + protected: virtual bool DoUseBlob(const TUnifiedBlobId& blobId) override; virtual bool DoFreeBlob(const TUnifiedBlobId& blobId) override; + public: virtual bool IsBlobInUsage(const NOlap::TUnifiedBlobId& blobId) const override; virtual void OnBlobFree(const TUnifiedBlobId& blobId) = 0; @@ -34,8 +36,10 @@ class IBlobsStorageOperator { YDB_READONLY(bool, Stopped, false); std::shared_ptr Counters; YDB_ACCESSOR_DEF(std::shared_ptr, SharedBlobs); + protected: - virtual std::shared_ptr DoStartDeclareRemovingAction(const std::shared_ptr& counters) = 0; + virtual std::shared_ptr DoStartDeclareRemovingAction( + const std::shared_ptr& counters) = 0; virtual std::shared_ptr DoStartWritingAction() = 0; virtual std::shared_ptr DoStartReadingAction() = 0; virtual bool DoLoad(IBlobManagerDb& dbBlobs) = 0; @@ -67,16 +71,13 @@ class IBlobsStorageOperator { IBlobsStorageOperator(const TString& storageId, const std::shared_ptr& sharedBlobs) : SelfTabletId(sharedBlobs->GetSelfTabletId()) , StorageId(storageId) - , SharedBlobs(sharedBlobs) - { + , SharedBlobs(sharedBlobs) { Counters = std::make_shared(storageId); } void Stop(); - const NSplitter::TSplitSettings& GetBlobSplitSettings() const { - return DoGetBlobSplitSettings(); - } + const NSplitter::TSplitSettings& GetBlobSplitSettings() const; virtual TTabletsByBlob GetBlobsToDelete() const = 0; virtual bool HasToDelete(const TUnifiedBlobId& blobId, const TTabletId initiatorTabletId) const = 0; @@ -120,7 +121,8 @@ class IBlobsStorageOperator { } [[nodiscard]] std::shared_ptr CreateGC() { - NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("storage_id", GetStorageId())("tablet_id", GetSelfTabletId()); + NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)( + "storage_id", GetStorageId())("tablet_id", GetSelfTabletId()); if (CurrentGCAction && CurrentGCAction->IsInProgress()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_BLOBS)("event", "gc_in_progress"); return nullptr; @@ -137,4 +139,4 @@ class IBlobsStorageOperator { virtual bool IsReady() const = 0; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/blobs_action/local/storage.h b/ydb/core/tx/columnshard/blobs_action/local/storage.h index 142c0700f0b4..85accbe03847 100644 --- a/ydb/core/tx/columnshard/blobs_action/local/storage.h +++ b/ydb/core/tx/columnshard/blobs_action/local/storage.h @@ -53,4 +53,4 @@ class TOperator: public IBlobsStorageOperator { } }; -} +} // namespace NKikimr::NOlap::NBlobOperations::NLocal diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 18fce9cf0624..62abfcf9be9a 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -419,7 +419,8 @@ NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap chunk = index->BuildIndex(originalData, recordsCount, *this); auto opStorage = operators->GetOperatorVerified(index->GetStorageId()); if ((i64)chunk->GetPackedSize() > opStorage->GetBlobSplitSettings().GetMaxBlobSize()) { - return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ") bigger than limit (" + + return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ":" + ::ToString(chunk->GetPackedSize()) + ":" + + ::ToString(recordsCount) + ") bigger than limit (" + ::ToString(opStorage->GetBlobSplitSettings().GetMaxBlobSize()) + ")"); } if (index->GetStorageId() == IStoragesManager::LocalMetadataStorageId) { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp index 6ac8fc0891a4..3a833a465ac8 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp @@ -3,9 +3,10 @@ #include #include #include -#include #include +#include #include +#include #include #include @@ -60,7 +61,7 @@ TConclusion> ISnapshotSchema::Normali } if (restoreColumnIds.contains(columnId)) { AFL_VERIFY(!!GetExternalDefaultValueVerified(columnId) || GetIndexInfo().IsNullableVerified(columnId))("column_name", - GetIndexInfo().GetColumnName(columnId, false))("id", columnId); + GetIndexInfo().GetColumnName(columnId, false))("id", columnId); result->AddField(resultField, GetColumnLoaderVerified(columnId)->BuildDefaultAccessor(batch->num_rows())).Validate(); } } @@ -324,7 +325,8 @@ TConclusion ISnapshotSchema::PrepareForWrite(c TGeneralSerializedSlice slice(chunks, schemaDetails, splitterCounters); std::vector blobs; - if (!slice.GroupBlobs(blobs, NSplitter::TEntityGroups(NSplitter::TSplitSettings(), NBlobOperations::TGlobal::DefaultStorageId))) { + if (!slice.GroupBlobs(blobs, NSplitter::TEntityGroups(NYDBTest::TControllers::GetColumnShardController()->GetBlobSplitSettings(), + NBlobOperations::TGlobal::DefaultStorageId))) { return TConclusionStatus::Fail("cannot split data for appropriate blobs size"); } auto constructor = diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h index e75ce41b85fb..8192a2fb8cf9 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h @@ -38,12 +38,32 @@ class TFixStringBitsStorage { return (bytesCount + ((bitsCount % 8) ? 1 : 0)) * 8; } + TString DebugString() const { + TStringBuilder sb; + ui32 count1 = 0; + ui32 count0 = 0; + for (ui32 i = 0; i < GetSizeBits(); ++i) { + if (Get(i)) { +// sb << 1 << " "; + ++count1; + } else { +// sb << 0 << " "; + ++count0; + } +// if (i % 20 == 0) { +// sb << i << " "; +// } + } + sb << GetSizeBits() << "=" << count0 << "[0]+" << count1 << "[1]"; + return sb; + } + template TFixStringBitsStorage(const TBitsVector& bitsVector) : TFixStringBitsStorage(TSizeDetector::GetSize(bitsVector)) { ui32 byteIdx = 0; ui8 byteCurrent = 0; - ui8 shiftCurrent = 0; + ui8 shiftCurrent = 1; for (ui32 i = 0; i < TSizeDetector::GetSize(bitsVector); ++i) { if (i && i % 8 == 0) { Data[byteIdx] = (char)byteCurrent; diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp index 5eec032ad45f..9eed96423754 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp @@ -21,12 +21,15 @@ bool TFilterChecker::DoCheckImpl(const std::vector& blobs) const { for (auto&& blob : blobs) { TFixStringBitsStorage bits(blob); bool found = true; + TStringBuilder sb; for (auto&& i : HashValues) { + sb << i % bits.GetSizeBits() << ","; if (!bits.Get(i % bits.GetSizeBits())) { found = false; break; } } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("size", bits.GetSizeBits())("found", found)("hashes", sb)("details", bits.DebugString()); if (found) { // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId()); return true; diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/const.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/const.cpp index c6b4378157b1..7f5af16eab37 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/const.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/const.cpp @@ -4,6 +4,10 @@ namespace NKikimr::NOlap::NIndexes::NBloomNGramm { +TString TConstants::GetRecordsCountIntervalString() { + return TStringBuilder() << "[" << MinRecordsCount << ", " << MaxRecordsCount << "]"; +} + TString TConstants::GetHashesCountIntervalString() { return TStringBuilder() << "[" << MinHashesCount << ", " << MaxHashesCount << "]"; } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/const.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/const.h index 1c0e56028806..2718cc0a37b2 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/const.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/const.h @@ -10,6 +10,12 @@ class TConstants { static constexpr ui32 MaxHashesCount = 8; static constexpr ui32 MinFilterSizeBytes = 128; static constexpr ui32 MaxFilterSizeBytes = 1 << 20; + static constexpr ui32 MinRecordsCount = 128; + static constexpr ui32 MaxRecordsCount = 1000000; + + static bool CheckRecordsCount(const ui32 value) { + return MinRecordsCount <= value && value <= MaxRecordsCount; + } static bool CheckNGrammSize(const ui32 value) { return MinNGrammSize <= value && value <= MaxNGrammSize; @@ -26,6 +32,7 @@ class TConstants { static TString GetHashesCountIntervalString(); static TString GetFilterSizeBytesIntervalString(); static TString GetNGrammSizeIntervalString(); + static TString GetRecordsCountIntervalString(); }; } // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp index 5d43b0500dfb..8929d9fd4c53 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp @@ -15,7 +15,7 @@ std::shared_ptr TIndexConstructor::DoCreateIndexMeta( } const ui32 columnId = columnInfo->GetId(); return std::make_shared(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId, - HashesCount, FilterSizeBytes, NGrammSize); + HashesCount, FilterSizeBytes, NGrammSize, RecordsCount); } TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { @@ -29,6 +29,14 @@ TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonVal return TConclusionStatus::Fail("empty column_name in bloom ngramm filter features"); } + if (!jsonInfo["records_count"].IsUInteger()) { + return TConclusionStatus::Fail("records_count have to be in bloom filter features as uint field"); + } + RecordsCount = jsonInfo["records_count"].GetUInteger(); + if (!TConstants::CheckRecordsCount(RecordsCount)) { + return TConclusionStatus::Fail("records_count have to be in bloom ngramm filter in interval " + TConstants::GetRecordsCountIntervalString()); + } + if (!jsonInfo["ngramm_size"].IsUInteger()) { return TConclusionStatus::Fail("ngramm_size have to be in bloom filter features as uint field"); } @@ -64,6 +72,10 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki return TConclusionStatus::Fail(errorMessage); } auto& bFilter = proto.GetBloomNGrammFilter(); + RecordsCount = bFilter.GetRecordsCount(); + if (!TConstants::CheckRecordsCount(RecordsCount)) { + return TConclusionStatus::Fail("RecordsCount have to be in " + TConstants::GetRecordsCountIntervalString()); + } NGrammSize = bFilter.GetNGrammSize(); if (!TConstants::CheckNGrammSize(NGrammSize)) { return TConclusionStatus::Fail("NGrammSize have to be in " + TConstants::GetNGrammSizeIntervalString()); @@ -86,6 +98,7 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const { auto* filterProto = proto.MutableBloomNGrammFilter(); filterProto->SetColumnName(ColumnName); + filterProto->SetRecordsCount(RecordsCount); filterProto->SetNGrammSize(NGrammSize); filterProto->SetFilterSizeBytes(FilterSizeBytes); filterProto->SetHashesCount(HashesCount); diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h index bf666370393d..209b1a5de8f0 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h @@ -13,6 +13,7 @@ class TIndexConstructor: public IIndexMetaConstructor { ui32 NGrammSize = 3; ui32 FilterSizeBytes = 512; ui32 HashesCount = 2; + ui32 RecordsCount = 10000; static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); protected: diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp index 58fc206c5302..9f22ea0934d6 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp @@ -22,28 +22,26 @@ class TNGrammBuilder { template class THashesBuilder { public: - static ui64 Build(const ui8* data, ui64& h) { - h = h ^ uint64_t(*data); - h = h * 16777619; - return THashesBuilder::Build(data + 1, h); + static ui64 Build(const ui8* data, const ui64 h) { + return THashesBuilder::Build(data + 1, (h ^ uint64_t(*data)) * 16777619); } }; template <> class THashesBuilder<0> { public: - static ui64 Build(const ui8* /*data*/, ui64& hash) { + static ui64 Build(const ui8* /*data*/, const ui64 hash) { return hash; } }; template class THashesCountSelector { + static constexpr ui64 HashStart = (ui64)HashIdx * (ui64)2166136261; public: template static void BuildHashes(const ui8* data, TActor& actor) { - ui64 hash = (ui64)2166136261 * (ui64)HashIdx; - actor(THashesBuilder::Build(data, hash)); + actor(THashesBuilder::Build(data, HashStart)); THashesCountSelector::BuildHashes(data, actor); } }; @@ -212,13 +210,22 @@ class TVectorInserterPower2 { } }; -TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 /*recordsCount*/) const { +TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const { AFL_VERIFY(reader.GetColumnsCount() == 1)("count", reader.GetColumnsCount()); TNGrammBuilder builder(HashesCount); TDynBitMap bitMap; - const ui32 size = FilterSizeBytes * 8; - bitMap.Reserve(FilterSizeBytes * 8); + ui32 size = FilterSizeBytes * 8; + if ((size & (size - 1)) == 0) { + ui32 recordsCountBase = RecordsCount; + while (recordsCountBase < recordsCount && size * 2 <= TConstants::MaxFilterSizeBytes) { + size <<= 1; + recordsCountBase *= 2; + } + } else { + size *= ((recordsCount <= RecordsCount) ? 1.0 : (1.0 * recordsCount / RecordsCount)); + } + bitMap.Reserve(size * 8); const auto doFillFilter = [&](auto& inserter) { for (reader.Start(); reader.IsCorrect();) { diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h index 562c0471f1a7..1e9135e8c9d7 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h @@ -12,20 +12,33 @@ class TIndexMeta: public TIndexByColumns { std::shared_ptr ResultSchema; ui32 NGrammSize = 3; ui32 FilterSizeBytes = 512; + ui32 RecordsCount = 10000; ui32 HashesCount = 2; static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); void Initialize() { AFL_VERIFY(!ResultSchema); std::vector> fields = {std::make_shared("", arrow::boolean())}; ResultSchema = std::make_shared(fields); - AFL_VERIFY(HashesCount > 0); - AFL_VERIFY(FilterSizeBytes > 0); - AFL_VERIFY(NGrammSize > 2); + AFL_VERIFY(TConstants::CheckHashesCount(HashesCount)); + AFL_VERIFY(TConstants::CheckFilterSizeBytes(FilterSizeBytes)); + AFL_VERIFY(TConstants::CheckNGrammSize(NGrammSize)); + AFL_VERIFY(TConstants::CheckRecordsCount(RecordsCount)); } protected: - virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& /*newMeta*/) const override { - return TConclusionStatus::Fail("not supported"); + virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override { + const auto* bMeta = dynamic_cast(&newMeta); + if (!bMeta) { + return TConclusionStatus::Fail( + "cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); + } + if (HashesCount != bMeta->HashesCount) { + return TConclusionStatus::Fail("cannot modify hashes count"); + } + if (NGrammSize != bMeta->NGrammSize) { + return TConclusionStatus::Fail("cannot modify ngramm size"); + } + return TBase::CheckSameColumnsForModification(newMeta); } virtual void DoFillIndexCheckers(const std::shared_ptr& info, const NSchemeShard::TOlapSchema& schema) const override; @@ -35,16 +48,22 @@ class TIndexMeta: public TIndexByColumns { AFL_VERIFY(TBase::DoDeserializeFromProto(proto)); AFL_VERIFY(proto.HasBloomNGrammFilter()); auto& bFilter = proto.GetBloomNGrammFilter(); + if (bFilter.HasRecordsCount()) { + RecordsCount = bFilter.GetRecordsCount(); + if (!TConstants::CheckRecordsCount(RecordsCount)) { + return false; + } + } HashesCount = bFilter.GetHashesCount(); - if (HashesCount < 1 || 10 < HashesCount) { + if (!TConstants::CheckHashesCount(HashesCount)) { return false; } NGrammSize = bFilter.GetNGrammSize(); - if (NGrammSize < 3) { + if (!TConstants::CheckNGrammSize(NGrammSize)) { return false; } FilterSizeBytes = bFilter.GetFilterSizeBytes(); - if (FilterSizeBytes < 128) { + if (!TConstants::CheckFilterSizeBytes(FilterSizeBytes)) { return false; } if (!bFilter.HasColumnId() || !bFilter.GetColumnId()) { @@ -56,10 +75,12 @@ class TIndexMeta: public TIndexByColumns { } virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override { auto* filterProto = proto.MutableBloomNGrammFilter(); - AFL_VERIFY(NGrammSize >= 3); - AFL_VERIFY(FilterSizeBytes >= 128); - AFL_VERIFY(HashesCount >= 1); + AFL_VERIFY(TConstants::CheckNGrammSize(NGrammSize)); + AFL_VERIFY(TConstants::CheckFilterSizeBytes(FilterSizeBytes)); + AFL_VERIFY(TConstants::CheckHashesCount(HashesCount)); + AFL_VERIFY(TConstants::CheckRecordsCount(RecordsCount)); AFL_VERIFY(ColumnIds.size() == 1); + filterProto->SetRecordsCount(RecordsCount); filterProto->SetNGrammSize(NGrammSize); filterProto->SetFilterSizeBytes(FilterSizeBytes); filterProto->SetHashesCount(HashesCount); @@ -69,10 +90,11 @@ class TIndexMeta: public TIndexByColumns { public: TIndexMeta() = default; TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const ui32 hashesCount, - const ui32 filterSizeBytes, const ui32 nGrammSize) + const ui32 filterSizeBytes, const ui32 nGrammSize, const ui32 recordsCount) : TBase(indexId, indexName, { columnId }, storageId) , NGrammSize(nGrammSize) , FilterSizeBytes(filterSizeBytes) + , RecordsCount(recordsCount) , HashesCount(hashesCount) { Initialize(); diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 3f3a035f6817..85975a5dc822 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -334,7 +334,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, st for (auto&& i : changes->GetAppendedPortions()) { blobsCount += i.GetBlobs().size(); } - UNIT_ASSERT_VALUES_EQUAL(blobsCount, 1); // add 2 columns: planStep, txId + AFL_VERIFY(blobsCount == 5 || blobsCount == 1)("count", blobsCount); AddIdsToBlobs(changes->MutableAppendedPortions(), blobs, step); diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 7dba87d3bb0b..4867ac18b7ad 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -67,7 +67,8 @@ class TWritingBlob { if (BlobSize + batch.GetSplittedBlobs().GetSize() < 8 * 1024 * 1024) { Ranges.emplace_back(&batch); BlobSize += batch.GetSplittedBlobs().GetSize(); - batch.SetRange(TBlobRange(TUnifiedBlobId(0, 0, 0, 0, 0, 0, BlobSize), BlobData.size(), batch.GetSplittedBlobs().GetSize())); + batch.SetRange(TBlobRange( + TUnifiedBlobId(0, 0, 0, 0, 0, 0, BlobSize), BlobSize - batch.GetSplittedBlobs().GetSize(), batch.GetSplittedBlobs().GetSize())); BlobData.emplace_back(batch.GetSplittedBlobs().GetData()); return true; } else { diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 94b4eca7e4d2..7e3d7d11b289 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -143,6 +144,9 @@ class ICSController { virtual ui64 DoGetMemoryLimitScanPortion(const ui64 defaultValue) const { return defaultValue; } + virtual const NOlap::NSplitter::TSplitSettings& DoGetBlobSplitSettings(const NOlap::NSplitter::TSplitSettings& defaultValue) const { + return defaultValue; + } private: inline static const NKikimrConfig::TColumnShardConfig DefaultConfig = {}; @@ -155,6 +159,11 @@ class ICSController { } public: + const NOlap::NSplitter::TSplitSettings& GetBlobSplitSettings( + const NOlap::NSplitter::TSplitSettings& defaultValue = Default()) { + return DoGetBlobSplitSettings(defaultValue); + } + virtual void OnRequestTracingChanges( const std::set& /*snapshotsToSave*/, const std::set& /*snapshotsToRemove*/) { } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 1430c3a002c0..356eb913527a 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -25,6 +25,7 @@ class TController: public TReadOnlyController { YDB_ACCESSOR_DEF(std::optional, OverrideMaxReadStaleness); YDB_ACCESSOR(std::optional, OverrideMemoryLimitForPortionReading, 100); YDB_ACCESSOR(std::optional, OverrideLimitForPortionsMetadataAsk, 1); + YDB_ACCESSOR(std::optional, OverrideBlobSplitSettings, NOlap::NSplitter::TSplitSettings::BuildForTests()); YDB_ACCESSOR_DEF(std::optional, OverrideBlobPutResultOnWriteValue); @@ -135,7 +136,15 @@ class TController: public TReadOnlyController { THashSet SharingIds; protected: - virtual ::NKikimr::NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction(const ::NKikimr::NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& actions) const override; + virtual const NOlap::NSplitter::TSplitSettings& DoGetBlobSplitSettings(const NOlap::NSplitter::TSplitSettings& defaultValue) const override { + if (OverrideBlobSplitSettings) { + return *OverrideBlobSplitSettings; + } else { + return defaultValue; + } + } + virtual ::NKikimr::NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction( + const ::NKikimr::NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& actions) const override; virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const override { return OverrideLimitForPortionsMetadataAsk.value_or(defaultValue); diff --git a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp index 112bfb87b6ea..5a0ae23efce6 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp @@ -10,15 +10,16 @@ namespace NKikimr::NOlap { -std::optional> TBuildSlicesTask::BuildSlices() { +std::optional> TBuildSlicesTask::BuildSlices() { if (!OriginalBatch->num_rows()) { return std::vector(); } - NArrow::TBatchSplitttingContext context(NColumnShard::TLimits::GetMaxBlobSize()); + const auto splitSettings = NYDBTest::TControllers::GetColumnShardController()->GetBlobSplitSettings(); + NArrow::TBatchSplitttingContext context(splitSettings.GetMaxBlobSize()); context.SetFieldsForSpecialKeys(WriteData.GetPrimaryKeySchema()); auto splitResult = NArrow::SplitByBlobSize(OriginalBatch, context); if (splitResult.IsFail()) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD_WRITE)( + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)( "event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage()); return {}; } diff --git a/ydb/core/tx/columnshard/splitter/settings.h b/ydb/core/tx/columnshard/splitter/settings.h index 6f9f843cf874..208227cdb628 100644 --- a/ydb/core/tx/columnshard/splitter/settings.h +++ b/ydb/core/tx/columnshard/splitter/settings.h @@ -26,6 +26,10 @@ class TSplitSettings { YDB_ACCESSOR(i64, MaxPortionSize, DefaultMaxPortionSize); public: + static TSplitSettings BuildForTests(const double scaleKff = 1) { + return TSplitSettings().SetMaxBlobSize(1024 * 10 * scaleKff).SetMinBlobSize(256 * 10 * scaleKff); + } + ui64 GetExpectedRecordsCountOnPage() const { return 1.5 * MinRecordsCount; } diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index ced3252f5962..bafd74ff8fe2 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -553,6 +553,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); csControllerGuard->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); csControllerGuard->SetOverrideMaxReadStaleness(TDuration::Max()); + csControllerGuard->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings()); TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -2423,6 +2424,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); csDefaultControllerGuard->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation); csDefaultControllerGuard->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + csDefaultControllerGuard->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings()); TTester::Setup(runtime); runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_INFO);