Skip to content

Commit

Permalink
fix blob range construction and index control (ydb-platform#13131)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 4, 2025
1 parent 7ded84a commit 0657aba
Show file tree
Hide file tree
Showing 23 changed files with 169 additions and 48 deletions.
5 changes: 4 additions & 1 deletion ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideMemoryLimitForPortionReading(1e+10);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
Expand Down Expand Up @@ -103,6 +104,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
Expand Down Expand Up @@ -347,6 +349,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideMemoryLimitForPortionReading(1e+10);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());
TLocalHelper(*Kikimr).CreateTestOlapTable();
auto tableClient = Kikimr->GetTableClient();

Expand All @@ -367,7 +370,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto alterQuery =
TStringBuilder() << Sprintf(
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_uid, TYPE=BLOOM_NGRAMM_FILTER,
FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 64024}`);
FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 512, "records_count" : 1024}`);
)",
StorageId.data());
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 bytesPK1;
{
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Expand All @@ -127,6 +128,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
}

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());
ui64 rawBytesUnpack1PK = 0;
ui64 bytesUnpack1PK = 0;
ui64 rawBytesPackAndUnpack2PK;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ message TRequestedBloomNGrammFilter {
optional uint32 FilterSizeBytes = 2;
optional uint32 HashesCount = 3;
optional string ColumnName = 4;
optional uint32 RecordsCount = 5;
}

message TRequestedMaxIndex {
Expand Down Expand Up @@ -510,6 +511,7 @@ message TBloomNGrammFilter {
optional uint32 FilterSizeBytes = 2;
optional uint32 HashesCount = 3;
optional uint32 ColumnId = 4;
optional uint32 RecordsCount = 5;
}

message TMaxIndex {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/storage.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "storage.h"

#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>

namespace NKikimr::NOlap {

bool TCommonBlobsTracker::IsBlobInUsage(const NOlap::TUnifiedBlobId& blobId) const {
Expand Down Expand Up @@ -42,4 +44,8 @@ void IBlobsStorageOperator::Stop() {
Stopped = true;
}

const NSplitter::TSplitSettings& IBlobsStorageOperator::GetBlobSplitSettings() const {
return NYDBTest::TControllers::GetColumnShardController()->GetBlobSplitSettings(DoGetBlobSplitSettings());
}

} // namespace NKikimr::NOlap
26 changes: 14 additions & 12 deletions ydb/core/tx/columnshard/blobs_action/abstract/storage.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
#pragma once
#include "gc.h"
#include "read.h"
#include "remove.h"
#include "write.h"
#include "read.h"
#include "gc.h"

#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/blobs_action/counters/storage.h>
#include <ydb/core/tx/columnshard/blobs_action/counters/remove_gc.h>
#include <ydb/core/tx/columnshard/blobs_action/counters/storage.h>
#include <ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h>
#include <ydb/core/tx/tiering/abstract/manager.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/tiering/abstract/manager.h>

namespace NKikimr::NOlap {

class TCommonBlobsTracker: public IBlobInUseTracker {
private:
// List of blobs that are used by in-flight requests
THashMap<TUnifiedBlobId, i64> BlobsUseCount;

protected:
virtual bool DoUseBlob(const TUnifiedBlobId& blobId) override;
virtual bool DoFreeBlob(const TUnifiedBlobId& blobId) override;

public:
virtual bool IsBlobInUsage(const NOlap::TUnifiedBlobId& blobId) const override;
virtual void OnBlobFree(const TUnifiedBlobId& blobId) = 0;
Expand All @@ -34,8 +36,10 @@ class IBlobsStorageOperator {
YDB_READONLY(bool, Stopped, false);
std::shared_ptr<NBlobOperations::TStorageCounters> Counters;
YDB_ACCESSOR_DEF(std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>, SharedBlobs);

protected:
virtual std::shared_ptr<IBlobsDeclareRemovingAction> DoStartDeclareRemovingAction(const std::shared_ptr<NBlobOperations::TRemoveDeclareCounters>& counters) = 0;
virtual std::shared_ptr<IBlobsDeclareRemovingAction> DoStartDeclareRemovingAction(
const std::shared_ptr<NBlobOperations::TRemoveDeclareCounters>& counters) = 0;
virtual std::shared_ptr<IBlobsWritingAction> DoStartWritingAction() = 0;
virtual std::shared_ptr<IBlobsReadingAction> DoStartReadingAction() = 0;
virtual bool DoLoad(IBlobManagerDb& dbBlobs) = 0;
Expand Down Expand Up @@ -67,16 +71,13 @@ class IBlobsStorageOperator {
IBlobsStorageOperator(const TString& storageId, const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& sharedBlobs)
: SelfTabletId(sharedBlobs->GetSelfTabletId())
, StorageId(storageId)
, SharedBlobs(sharedBlobs)
{
, SharedBlobs(sharedBlobs) {
Counters = std::make_shared<NBlobOperations::TStorageCounters>(storageId);
}

void Stop();

const NSplitter::TSplitSettings& GetBlobSplitSettings() const {
return DoGetBlobSplitSettings();
}
const NSplitter::TSplitSettings& GetBlobSplitSettings() const;

virtual TTabletsByBlob GetBlobsToDelete() const = 0;
virtual bool HasToDelete(const TUnifiedBlobId& blobId, const TTabletId initiatorTabletId) const = 0;
Expand Down Expand Up @@ -120,7 +121,8 @@ class IBlobsStorageOperator {
}

[[nodiscard]] std::shared_ptr<IBlobsGCAction> CreateGC() {
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("storage_id", GetStorageId())("tablet_id", GetSelfTabletId());
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)(
"storage_id", GetStorageId())("tablet_id", GetSelfTabletId());
if (CurrentGCAction && CurrentGCAction->IsInProgress()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_BLOBS)("event", "gc_in_progress");
return nullptr;
Expand All @@ -137,4 +139,4 @@ class IBlobsStorageOperator {
virtual bool IsReady() const = 0;
};

}
} // namespace NKikimr::NOlap
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/blobs_action/local/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ class TOperator: public IBlobsStorageOperator {
}
};

}
} // namespace NKikimr::NOlap::NBlobOperations::NLocal
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap<ui32, std::vec
std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, recordsCount, *this);
auto opStorage = operators->GetOperatorVerified(index->GetStorageId());
if ((i64)chunk->GetPackedSize() > opStorage->GetBlobSplitSettings().GetMaxBlobSize()) {
return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ") bigger than limit (" +
return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ":" + ::ToString(chunk->GetPackedSize()) + ":" +
::ToString(recordsCount) + ") bigger than limit (" +
::ToString(opStorage->GetBlobSplitSettings().GetMaxBlobSize()) + ")");
}
if (index->GetStorageId() == IStoragesManager::LocalMetadataStorageId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/engines/storage/chunks/column.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>

#include <ydb/library/formats/arrow/simple_arrays_cache.h>
Expand Down Expand Up @@ -60,7 +61,7 @@ TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> ISnapshotSchema::Normali
}
if (restoreColumnIds.contains(columnId)) {
AFL_VERIFY(!!GetExternalDefaultValueVerified(columnId) || GetIndexInfo().IsNullableVerified(columnId))("column_name",
GetIndexInfo().GetColumnName(columnId, false))("id", columnId);
GetIndexInfo().GetColumnName(columnId, false))("id", columnId);
result->AddField(resultField, GetColumnLoaderVerified(columnId)->BuildDefaultAccessor(batch->num_rows())).Validate();
}
}
Expand Down Expand Up @@ -324,7 +325,8 @@ TConclusion<TWritePortionInfoWithBlobsResult> ISnapshotSchema::PrepareForWrite(c

TGeneralSerializedSlice slice(chunks, schemaDetails, splitterCounters);
std::vector<TSplittedBlob> blobs;
if (!slice.GroupBlobs(blobs, NSplitter::TEntityGroups(NSplitter::TSplitSettings(), NBlobOperations::TGlobal::DefaultStorageId))) {
if (!slice.GroupBlobs(blobs, NSplitter::TEntityGroups(NYDBTest::TControllers::GetColumnShardController()->GetBlobSplitSettings(),
NBlobOperations::TGlobal::DefaultStorageId))) {
return TConclusionStatus::Fail("cannot split data for appropriate blobs size");
}
auto constructor =
Expand Down
22 changes: 21 additions & 1 deletion ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,32 @@ class TFixStringBitsStorage {
return (bytesCount + ((bitsCount % 8) ? 1 : 0)) * 8;
}

TString DebugString() const {
TStringBuilder sb;
ui32 count1 = 0;
ui32 count0 = 0;
for (ui32 i = 0; i < GetSizeBits(); ++i) {
if (Get(i)) {
// sb << 1 << " ";
++count1;
} else {
// sb << 0 << " ";
++count0;
}
// if (i % 20 == 0) {
// sb << i << " ";
// }
}
sb << GetSizeBits() << "=" << count0 << "[0]+" << count1 << "[1]";
return sb;
}

template <class TBitsVector>
TFixStringBitsStorage(const TBitsVector& bitsVector)
: TFixStringBitsStorage(TSizeDetector<TBitsVector>::GetSize(bitsVector)) {
ui32 byteIdx = 0;
ui8 byteCurrent = 0;
ui8 shiftCurrent = 0;
ui8 shiftCurrent = 1;
for (ui32 i = 0; i < TSizeDetector<TBitsVector>::GetSize(bitsVector); ++i) {
if (i && i % 8 == 0) {
Data[byteIdx] = (char)byteCurrent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ bool TFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const {
for (auto&& blob : blobs) {
TFixStringBitsStorage bits(blob);
bool found = true;
TStringBuilder sb;
for (auto&& i : HashValues) {
sb << i % bits.GetSizeBits() << ",";
if (!bits.Get(i % bits.GetSizeBits())) {
found = false;
break;
}
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("size", bits.GetSizeBits())("found", found)("hashes", sb)("details", bits.DebugString());
if (found) {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

namespace NKikimr::NOlap::NIndexes::NBloomNGramm {

TString TConstants::GetRecordsCountIntervalString() {
return TStringBuilder() << "[" << MinRecordsCount << ", " << MaxRecordsCount << "]";
}

TString TConstants::GetHashesCountIntervalString() {
return TStringBuilder() << "[" << MinHashesCount << ", " << MaxHashesCount << "]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ class TConstants {
static constexpr ui32 MaxHashesCount = 8;
static constexpr ui32 MinFilterSizeBytes = 128;
static constexpr ui32 MaxFilterSizeBytes = 1 << 20;
static constexpr ui32 MinRecordsCount = 128;
static constexpr ui32 MaxRecordsCount = 1000000;

static bool CheckRecordsCount(const ui32 value) {
return MinRecordsCount <= value && value <= MaxRecordsCount;
}

static bool CheckNGrammSize(const ui32 value) {
return MinNGrammSize <= value && value <= MaxNGrammSize;
Expand All @@ -26,6 +32,7 @@ class TConstants {
static TString GetHashesCountIntervalString();
static TString GetFilterSizeBytesIntervalString();
static TString GetNGrammSizeIntervalString();
static TString GetRecordsCountIntervalString();
};

} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ std::shared_ptr<IIndexMeta> TIndexConstructor::DoCreateIndexMeta(
}
const ui32 columnId = columnInfo->GetId();
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId,
HashesCount, FilterSizeBytes, NGrammSize);
HashesCount, FilterSizeBytes, NGrammSize, RecordsCount);
}

TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
Expand All @@ -29,6 +29,14 @@ TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonVal
return TConclusionStatus::Fail("empty column_name in bloom ngramm filter features");
}

if (!jsonInfo["records_count"].IsUInteger()) {
return TConclusionStatus::Fail("records_count have to be in bloom filter features as uint field");
}
RecordsCount = jsonInfo["records_count"].GetUInteger();
if (!TConstants::CheckRecordsCount(RecordsCount)) {
return TConclusionStatus::Fail("records_count have to be in bloom ngramm filter in interval " + TConstants::GetRecordsCountIntervalString());
}

if (!jsonInfo["ngramm_size"].IsUInteger()) {
return TConclusionStatus::Fail("ngramm_size have to be in bloom filter features as uint field");
}
Expand Down Expand Up @@ -64,6 +72,10 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki
return TConclusionStatus::Fail(errorMessage);
}
auto& bFilter = proto.GetBloomNGrammFilter();
RecordsCount = bFilter.GetRecordsCount();
if (!TConstants::CheckRecordsCount(RecordsCount)) {
return TConclusionStatus::Fail("RecordsCount have to be in " + TConstants::GetRecordsCountIntervalString());
}
NGrammSize = bFilter.GetNGrammSize();
if (!TConstants::CheckNGrammSize(NGrammSize)) {
return TConclusionStatus::Fail("NGrammSize have to be in " + TConstants::GetNGrammSizeIntervalString());
Expand All @@ -86,6 +98,7 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki
void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
auto* filterProto = proto.MutableBloomNGrammFilter();
filterProto->SetColumnName(ColumnName);
filterProto->SetRecordsCount(RecordsCount);
filterProto->SetNGrammSize(NGrammSize);
filterProto->SetFilterSizeBytes(FilterSizeBytes);
filterProto->SetHashesCount(HashesCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TIndexConstructor: public IIndexMetaConstructor {
ui32 NGrammSize = 3;
ui32 FilterSizeBytes = 512;
ui32 HashesCount = 2;
ui32 RecordsCount = 10000;
static inline auto Registrator = TFactory::TRegistrator<TIndexConstructor>(GetClassNameStatic());

protected:
Expand Down
Loading

0 comments on commit 0657aba

Please sign in to comment.