Skip to content

Commit

Permalink
Merge c0b9c4c into 961df40
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 11, 2024
2 parents 961df40 + c0b9c4c commit ee35642
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 69 deletions.
58 changes: 36 additions & 22 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatc

}

TColumnFilter::TApplyContext& TColumnFilter::TApplyContext::Slice(const ui32 start, const ui32 count) {
AFL_VERIFY(!StartPos && !Count);
StartPos = start;
Count = count;
return *this;
}

bool TColumnFilter::TIterator::Next(const ui32 size) {
Y_ABORT_UNLESS(size);
if (CurrentRemainVolume > size) {
Expand Down Expand Up @@ -230,7 +237,7 @@ bool TColumnFilter::IsTotalDenyFilter() const {
}

void TColumnFilter::Reset(const ui32 count) {
Count = 0;
RecordsCount = 0;
FilterPlain.reset();
Filter.clear();
Filter.reserve(count / 4);
Expand All @@ -240,13 +247,13 @@ void TColumnFilter::Add(const bool value, const ui32 count) {
if (!count) {
return;
}
if (Y_UNLIKELY(LastValue != value || !Count)) {
if (Y_UNLIKELY(LastValue != value || !RecordsCount)) {
Filter.emplace_back(count);
LastValue = value;
} else {
Filter.back() += count;
}
Count += count;
RecordsCount += count;
}

ui32 TColumnFilter::CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2) {
Expand Down Expand Up @@ -311,15 +318,15 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D
}

template <class TData>
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const TColumnFilter::TApplyContext& context) {
if (!batch || !batch->num_rows()) {
return false;
}
AFL_VERIFY(!!startPos == !!count);
if (!filter.IsEmpty()) {
if (startPos) {
AFL_VERIFY(filter.Size() >= *startPos + *count)("filter_size", filter.Size())("start", *startPos)("count", *count);
AFL_VERIFY(*count == (size_t)batch->num_rows())("count", *count)("batch_size", batch->num_rows());
if (context.HasSlice()) {
AFL_VERIFY(filter.Size() >= *context.GetStartPos() + *context.GetCount())("filter_size", filter.Size())("start", context.GetStartPos())(
"count", context.GetCount());
AFL_VERIFY(*context.GetCount() == (size_t)batch->num_rows())("count", context.GetCount())("batch_size", batch->num_rows());
} else {
AFL_VERIFY(filter.Size() == (size_t)batch->num_rows())("filter_size", filter.Size())("batch_size", batch->num_rows());
}
Expand All @@ -331,20 +338,27 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
if (filter.IsTotalAllowFilter()) {
return true;
}
batch = NAdapter::TDataBuilderPolicy<TData>::ApplyArrowFilter(batch, filter.BuildArrowFilter(batch->num_rows(), startPos, count));
if (context.GetTrySlices() && filter.GetFilter().size() * 10 < filter.GetCount() &&
filter.GetCount() < filter.GetFilteredCount().value_or(batch->num_rows()) * 50) {
batch =
NAdapter::TDataBuilderPolicy<TData>::ApplySlicesFilter(batch, filter.BuildSlicesIterator(context.GetStartPos(), context.GetCount()));
} else {
batch = NAdapter::TDataBuilderPolicy<TData>::ApplyArrowFilter(
batch, filter.BuildArrowFilter(batch->num_rows(), context.GetStartPos(), context.GetCount()));
}
return batch->num_rows();
}

bool TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl(*this, batch, startPos, count);
bool TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const TApplyContext& context) const {
return ApplyImpl(*this, batch, context);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl(*this, batch, startPos, count);
bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const TApplyContext& context) const {
return ApplyImpl(*this, batch, context);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl(*this, batch, startPos, count);
bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const TApplyContext& context) const {
return ApplyImpl(*this, batch, context);
}

void TColumnFilter::Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const {
Expand Down Expand Up @@ -382,9 +396,9 @@ void TColumnFilter::Apply(const ui32 expectedRecordsCount, std::vector<arrow::Da

const std::vector<bool>& TColumnFilter::BuildSimpleFilter() const {
if (!FilterPlain) {
Y_ABORT_UNLESS(Count);
Y_ABORT_UNLESS(RecordsCount);
std::vector<bool> result;
result.resize(Count, true);
result.resize(RecordsCount, true);
bool currentValue = GetStartValue();
ui32 currentPosition = 0;
for (auto&& i : Filter) {
Expand Down Expand Up @@ -450,7 +464,7 @@ class TColumnFilter::TMergerImpl {
} else if (Filter2.empty()) {
return TMergePolicy::MergeWithSimple(Filter1, Filter2.DefaultFilterValue);
} else {
Y_ABORT_UNLESS(Filter1.Count == Filter2.Count);
Y_ABORT_UNLESS(Filter1.RecordsCount == Filter2.RecordsCount);
auto it1 = Filter1.Filter.cbegin();
auto it2 = Filter2.Filter.cbegin();

Expand Down Expand Up @@ -495,7 +509,7 @@ class TColumnFilter::TMergerImpl {
TColumnFilter result = TColumnFilter::BuildAllowFilter();
std::swap(resultFilter, result.Filter);
std::swap(curCurrent, result.LastValue);
std::swap(count, result.Count);
std::swap(count, result.RecordsCount);
return result;
}
}
Expand Down Expand Up @@ -569,7 +583,7 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
TColumnFilter result = TColumnFilter::BuildAllowFilter();
std::swap(resultFilter, result.Filter);
std::swap(curCurrent, result.LastValue);
std::swap(count, result.Count);
std::swap(count, result.RecordsCount);
return result;
}
}
Expand All @@ -588,10 +602,10 @@ TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui
std::optional<ui32> TColumnFilter::GetFilteredCount() const {
if (!FilteredCount) {
if (IsTotalAllowFilter()) {
if (!Count) {
if (!RecordsCount) {
return {};
} else {
FilteredCount = Count;
FilteredCount = RecordsCount;
}
} else if (IsTotalDenyFilter()) {
FilteredCount = 0;
Expand Down
143 changes: 116 additions & 27 deletions ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/datum.h>
#include <util/system/types.h>

#include <deque>

namespace NKikimr::NArrow {
Expand All @@ -21,14 +22,94 @@ class TColumnFilter {
private:
bool DefaultFilterValue = true;
bool LastValue = true;
ui32 Count = 0;
std::vector<ui32> Filter;
ui32 RecordsCount = 0;
YDB_READONLY_DEF(std::vector<ui32>, Filter);
mutable std::optional<std::vector<bool>> FilterPlain;
mutable std::optional<ui32> FilteredCount;
TColumnFilter(const bool defaultFilterValue)
: DefaultFilterValue(defaultFilterValue)
{
: DefaultFilterValue(defaultFilterValue) {
}

static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2);
class TMergerImpl;
void Reset(const ui32 count);
void ResetCaches() const {
FilterPlain.reset();
FilteredCount.reset();
}

public:
class TSlicesIterator {
private:
const TColumnFilter& Owner;
const std::optional<ui32> Start;
const std::optional<ui32> Count;
ui32 CurrentStartIndex = 0;
bool CurrentIsFiltered = false;
std::vector<ui32>::const_iterator CurrentIterator;
public:
TSlicesIterator(const TColumnFilter& owner, const std::optional<ui32> start, const std::optional<ui32> count)
: Start(start)
, Count(count) {
AFL_VERIFY(!!Start == !!Count);
AFL_VERIFY(Owner.GetFilter().size());
if (Start) {
AFL_VERIFY(*Start + *Count <= owner.GetRecordsCount())("start", *start)("count", *count)("size", owner.GetRecordsCount());
}
}

bool IsFiltered() const {
return CurrentIsFiltered;
}

ui32 GetStartIndex() const {
if (!Start) {
return CurrentStartIndex;
} else {
return std::max<ui32>(CurrentStartIndex, *Start);
}
}

ui32 GetSliceSize() const {
AFL_VERIFY(IsValid());
if (!Start) {
return *CurrentIterator;
} else {
const ui32 startIndex = GetStartIndex();
const ui32 finishIndex = std::min<ui32>(CurrentStartIndex + *CurrentIterator, *Start + *Count);
AFL_VERIFY(startIndex < finishIndex)("start", startIndex)("finish", finishIndex);
return finishIndex - startIndex;
}
}

void Start() {
CurrentStartIndex = 0;
CurrentIsFiltered = Owner.GetStartValue();
CurrentIterator = Owner.GetFilter().begin();
if (Start) {
while (IsValid() && CurrentStartIndex + *CurrentIterator < *Start) {
AFL_VERIFY(Next());
}
AFL_VERIFY(IsValid());
}
}

bool IsValid() const {
return CurrentIterator != Owner.GetFilter().end() && (!Start || CurrentStartIndex < *Start + *Count);
}

bool Next() {
AFL_VERIFY(IsValid());
CurrentIsFiltered = !CurrentIsFiltered;
++CurrentIterator;
return IsValid();
}

};


ui32 GetRecordsCount() const {
return RecordsCount;
}

bool GetStartValue(const bool reverse = false) const {
Expand All @@ -46,22 +127,15 @@ class TColumnFilter {
}
}

static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2);
class TMergerImpl;
void Reset(const ui32 count);
void ResetCaches() const {
FilterPlain.reset();
FilteredCount.reset();
}
public:
void Append(const TColumnFilter& filter);
void Add(const bool value, const ui32 count = 1);
std::optional<ui32> GetFilteredCount() const;
const std::vector<bool>& BuildSimpleFilter() const;
std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(const ui32 expectedSize, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(
const ui32 expectedSize, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;

ui64 GetDataSize() const {
return Filter.capacity() * sizeof(ui32) + Count * sizeof(bool);
return Filter.capacity() * sizeof(ui32) + RecordsCount * sizeof(bool);
}

static ui64 GetPredictedMemorySize(const ui32 recordsCount) {
Expand All @@ -77,15 +151,15 @@ class TColumnFilter {
bool CurrentValue;
const i32 FinishPosition;
const i32 DeltaPosition;

public:
TString DebugString() const;

TIterator(const bool reverse, const std::vector<ui32>& filter, const bool startValue)
: FilterPointer(&filter)
, CurrentValue(startValue)
, FinishPosition(reverse ? -1 : FilterPointer->size())
, DeltaPosition(reverse ? -1 : 1)
{
, DeltaPosition(reverse ? -1 : 1) {
if (!FilterPointer->size()) {
Position = FinishPosition;
} else {
Expand Down Expand Up @@ -158,11 +232,10 @@ class TColumnFilter {
struct TAdapterLambda {
private:
TGetterLambda Getter;

public:
TAdapterLambda(const TGetterLambda& getter)
: Getter(getter)
{

: Getter(getter) {
}

bool operator[](const ui32 index) const {
Expand All @@ -175,10 +248,6 @@ class TColumnFilter {
return Reset(count, TAdapterLambda<TGetterLambda>(getter));
}

ui32 Size() const {
return Count;
}

bool IsTotalAllowFilter() const;
bool IsTotalDenyFilter() const;
bool IsEmpty() const {
Expand All @@ -199,13 +268,33 @@ class TColumnFilter {
// It makes a filter using composite predicate
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);

bool Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
class TApplyContext {
private:
YDB_READONLY_DEF(std::optional<ui32>, StartPos);
YDB_READONLY_DEF(std::optional<ui32>, Count);
YDB_ACCESSOR(bool, TrySlices, false);

public:
TApplyContext() = default;
bool HasSlice() const {
return !!StartPos && !!Count;
}

TApplyContext(const ui32 start, const ui32 count)
: StartPos(start)
, Count(count) {
}

TApplyContext& Slice(const ui32 start, const ui32 count);
};

bool Apply(std::shared_ptr<TGeneralContainer>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
bool Apply(std::shared_ptr<arrow::Table>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;

// Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions)
TColumnFilter CombineSequentialAnd(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
};

}
} // namespace NKikimr::NArrow
Loading

0 comments on commit ee35642

Please sign in to comment.