Skip to content

Commit

Permalink
bloom ngramms speed up (ydb-platform#12982)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 26, 2024
1 parent 24d1f57 commit 2f62452
Show file tree
Hide file tree
Showing 28 changed files with 345 additions and 154 deletions.
25 changes: 23 additions & 2 deletions ydb/core/formats/arrow/hash/calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

namespace NKikimr::NArrow::NHash {

void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer) {
namespace {
template <class TStreamCalcer>
void AppendFieldImpl(const std::shared_ptr<arrow::Scalar>& scalar, TStreamCalcer& hashCalcer) {
AFL_VERIFY(scalar);
NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
Expand All @@ -28,7 +30,8 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TSt
});
}

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
template <class TStreamCalcer>
void AppendFieldImpl(const std::shared_ptr<arrow::Array>& array, const int row, TStreamCalcer& hashCalcer) {
NArrow::SwitchType(array->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using T = typename TWrap::T;
Expand All @@ -49,6 +52,24 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int ro
});
}

} // namespace

void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer) {
AppendFieldImpl(scalar, hashCalcer);
}

void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer_H3& hashCalcer) {
AppendFieldImpl(scalar, hashCalcer);
}

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
AppendFieldImpl(array, row, hashCalcer);
}

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer_H3& hashCalcer) {
AppendFieldImpl(array, row, hashCalcer);
}

std::optional<std::vector<ui64>> TXX64::Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const {
std::vector<std::shared_ptr<arrow::Array>> columns = GetColumns(batch);
if (columns.empty()) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/formats/arrow/hash/calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class TXX64 {

static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer_H3& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer_H3& hashCalcer);
static ui64 CalcHash(const std::shared_ptr<arrow::Scalar>& scalar);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;

Expand Down
1 change: 0 additions & 1 deletion ydb/core/formats/arrow/permutations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <ydb/library/actors/core/log.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <contrib/libs/xxhash/xxhash.h>

namespace NKikimr::NArrow {

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 300;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -494,12 +494,12 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 300;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -511,13 +511,13 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
"approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 300;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -529,7 +529,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
"approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
for (auto&& i : packs) {
TGeneralSerializedSlice slicePrimary(std::move(i));
auto dataWithSecondary = resultFiltered->GetIndexInfo()
.AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager())
.AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager(), slicePrimary.GetRecordsCount())
.DetachResult();
TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schemaDetails, Context.Counters.SplitterCounters);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ std::optional<TWritePortionInfoWithBlobsResult> TReadPortionInfoWithBlobs::SyncP
TIndexInfo::TSecondaryData secondaryData;
secondaryData.MutableExternalData() = entityChunksNew;
for (auto&& i : to->GetIndexInfo().GetIndexes()) {
to->GetIndexInfo().AppendIndex(entityChunksNew, i.first, storages, secondaryData).Validate();
to->GetIndexInfo().AppendIndex(entityChunksNew, i.first, storages, source.PortionInfo.GetPortionInfo().GetRecordsCount(), secondaryData).Validate();
}

const NSplitter::TEntityGroups groups = source.PortionInfo.GetPortionInfo().GetEntityGroupsByStorageId(targetTier, *storages, to->GetIndexInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ class TFetchingStepSignals: public NColumnShard::TCommonCountersOwner {
public:
TFetchingStepSignals(NColumnShard::TCommonCountersOwner&& owner)
: TBase(std::move(owner))
, DurationCounter(TBase::GetDeriviative("duration_ms"))
, BytesCounter(TBase::GetDeriviative("bytes_ms")) {
, DurationCounter(TBase::GetDeriviative("Duration/Us"))
, BytesCounter(TBase::GetDeriviative("Bytes/Count")) {
}

void AddDuration(const TDuration d) const {
DurationCounter->Add(d.MilliSeconds());
DurationCounter->Add(d.MicroSeconds());
}

void AddBytes(const ui32 v) const {
Expand All @@ -56,7 +56,7 @@ class TFetchingStepsSignalsCollection: public NColumnShard::TCommonCountersOwner

public:
TFetchingStepsSignalsCollection()
: TBase("scan_steps") {
: TBase("ScanSteps") {
}

static TFetchingStepSignals GetSignals(const TString& name) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,11 @@ std::shared_ptr<arrow::Scalar> TIndexInfo::GetColumnExternalDefaultValueVerified
}

NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& originalData,
const ui32 indexId, const std::shared_ptr<IStoragesManager>& operators, TSecondaryData& result) const {
const ui32 indexId, const std::shared_ptr<IStoragesManager>& operators, const ui32 recordsCount, TSecondaryData& result) const {
auto it = Indexes.find(indexId);
AFL_VERIFY(it != Indexes.end());
auto& index = it->second;
std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, *this);
std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, recordsCount, *this);
auto opStorage = operators->GetOperatorVerified(index->GetStorageId());
if ((i64)chunk->GetPackedSize() > opStorage->GetBlobSplitSettings().GetMaxBlobSize()) {
return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ") bigger than limit (" +
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/scheme/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ struct TIndexInfo: public IIndexInfo {
};

[[nodiscard]] TConclusion<TSecondaryData> AppendIndexes(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& primaryData,
const std::shared_ptr<IStoragesManager>& operators) const {
const std::shared_ptr<IStoragesManager>& operators, const ui32 recordsCount) const {
TSecondaryData result;
result.MutableExternalData() = primaryData;
for (auto&& i : Indexes) {
auto conclusion = AppendIndex(primaryData, i.first, operators, result);
auto conclusion = AppendIndex(primaryData, i.first, operators, recordsCount, result);
if (conclusion.IsFail()) {
return conclusion;
}
Expand All @@ -329,7 +329,7 @@ struct TIndexInfo: public IIndexInfo {
std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> GetIndexMetaCountMinSketch(const std::set<ui32>& columnIds) const;

[[nodiscard]] TConclusionStatus AppendIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& originalData,
const ui32 indexId, const std::shared_ptr<IStoragesManager>& operators, TSecondaryData& result) const;
const ui32 indexId, const std::shared_ptr<IStoragesManager>& operators, const ui32 recordsCount, TSecondaryData& result) const;

/// Returns an id of the column located by name. The name should exists in the schema.
ui32 GetColumnIdVerified(const std::string& name) const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class IIndexMeta {
YDB_READONLY(ui32, IndexId, 0);
YDB_READONLY(TString, StorageId, IStoragesManager::DefaultStorageId);
protected:
virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const = 0;
virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data,
const ui32 recordsCount, const TIndexInfo& indexInfo) const = 0;
virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const = 0;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0;
Expand Down Expand Up @@ -67,8 +68,8 @@ class IIndexMeta {

virtual ~IIndexMeta() = default;

std::shared_ptr<IPortionDataChunk> BuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
return DoBuildIndex(data, indexInfo);
std::shared_ptr<IPortionDataChunk> BuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const ui32 recordsCount, const TIndexInfo& indexInfo) const {
return DoBuildIndex(data, recordsCount, indexInfo);
}

void FillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
Expand Down
29 changes: 28 additions & 1 deletion ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,39 @@ class TFixStringBitsStorage {
: Data(data)
{}

static ui32 GrowBitsCountToByte(const ui32 bitsCount) {
const ui32 bytesCount = bitsCount / 8;
return (bytesCount + ((bitsCount % 8) ? 1 : 0)) * 8;
}

TFixStringBitsStorage(const std::vector<bool>& bitsVector)
: TFixStringBitsStorage(bitsVector.size()) {
ui32 byteIdx = 0;
ui8 byteCurrent = 0;
ui8 shiftCurrent = 0;
for (ui32 i = 0; i < bitsVector.size(); ++i) {
if (i && i % 8 == 0) {
Data[byteIdx] = (char)byteCurrent;
byteCurrent = 0;
shiftCurrent = 1;
++byteIdx;
}
if (bitsVector[i]) {
byteCurrent += shiftCurrent;
}
shiftCurrent = (shiftCurrent << 1);
}
if (byteCurrent) {
Data[byteIdx] = (char)byteCurrent;
}
}

ui32 GetSizeBits() const {
return Data.size() * 8;
}

TFixStringBitsStorage(const ui32 sizeBits)
: Data(sizeBits / 8 + ((sizeBits % 8) ? 1 : 0), '\0') {
: Data(GrowBitsCountToByte(sizeBits) / 8, '\0') {
}

void Set(const bool val, const ui32 idx) {
Expand Down
34 changes: 14 additions & 20 deletions ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,21 @@

namespace NKikimr::NOlap::NIndexes {

TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
std::set<ui64> hashes;
{
NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(0);
TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const {
const ui32 bitsCount = TFixStringBitsStorage::GrowBitsCountToByte(HashesCount * recordsCount / std::log(2));
std::vector<bool> filterBits(bitsCount, false);
for (ui32 i = 0; i < HashesCount; ++i) {
NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 hashCalcer(i);
for (reader.Start(); reader.IsCorrect(); reader.ReadNext()) {
hashCalcer.Start();
for (auto&& i : reader) {
NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer);
}
hashes.emplace(hashCalcer.Finish());
filterBits[hashCalcer.Finish() % bitsCount] = true;
}
}

const ui32 bitsCount = HashesCount * hashes.size() / std::log(2);
TFixStringBitsStorage bits(bitsCount);
const auto pred = [&bits](const ui64 hash) {
bits.Set(true, hash % bits.GetSizeBits());
};
BuildHashesSet(hashes, pred);
return bits.GetData();
return TFixStringBitsStorage(filterBits).GetData();
}

void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
Expand All @@ -51,15 +46,14 @@ void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataF
continue;
}
std::set<ui64> hashes;
const auto pred = [&hashes](const ui64 hash) {
hashes.emplace(hash);
};
NArrow::NHash::NXX64::TStreamStringHashCalcer calcer(0);
calcer.Start();
for (auto&& i : foundColumns) {
NArrow::NHash::TXX64::AppendField(i.second, calcer);
for (ui32 i = 0; i < HashesCount; ++i) {
NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 calcer(i);
calcer.Start();
for (auto&& i : foundColumns) {
NArrow::NHash::TXX64::AppendField(i.second, calcer);
}
hashes.emplace(calcer.Finish());
}
BuildHashesSet(calcer.Finish(), pred);
branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TBloomIndexMeta: public TIndexByColumns {
}
virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override;

virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const override;
virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const override;

virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override {
AFL_VERIFY(TBase::DoDeserializeFromProto(proto));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "const.h"

#include <util/string/builder.h>

namespace NKikimr::NOlap::NIndexes::NBloomNGramm {

TString TConstants::GetHashesCountIntervalString() {
return TStringBuilder() << "[" << MinHashesCount << ", " << MaxHashesCount << "]";
}

TString TConstants::GetFilterSizeBytesIntervalString() {
return TStringBuilder() << "[" << MinFilterSizeBytes << ", " << MaxFilterSizeBytes << "]";
}

TString TConstants::GetNGrammSizeIntervalString() {
return TStringBuilder() << "[" << MinNGrammSize << ", " << MaxNGrammSize << "]";
}

} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>
namespace NKikimr::NOlap::NIndexes::NBloomNGramm {

class TConstants {
public:
static constexpr ui32 MinNGrammSize = 3;
static constexpr ui32 MaxNGrammSize = 8;
static constexpr ui32 MinHashesCount = 1;
static constexpr ui32 MaxHashesCount = 8;
static constexpr ui32 MinFilterSizeBytes = 128;
static constexpr ui32 MaxFilterSizeBytes = 1 << 20;

static bool CheckNGrammSize(const ui32 value) {
return MinNGrammSize <= value && value <= MaxNGrammSize;
}

static bool CheckHashesCount(const ui32 value) {
return MinHashesCount <= value && value <= MaxHashesCount;
}

static bool CheckFilterSizeBytes(const ui32 value) {
return MinFilterSizeBytes <= value && value <= MaxFilterSizeBytes;
}

static TString GetHashesCountIntervalString();
static TString GetFilterSizeBytesIntervalString();
static TString GetNGrammSizeIntervalString();
};

} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
Loading

0 comments on commit 2f62452

Please sign in to comment.