Skip to content

Commit

Permalink
Merge e5ace8d into b524a82
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 18, 2024
2 parents b524a82 + e5ace8d commit c290e0a
Show file tree
Hide file tree
Showing 17 changed files with 495 additions and 305 deletions.
32 changes: 17 additions & 15 deletions ydb/core/tx/columnshard/engines/predicate/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,59 +115,61 @@ bool TPredicateContainer::CrossRanges(const TPredicateContainer& ext) {
}
}

std::optional<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateFrom(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo) {
TConclusion<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateFrom(
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema) {
if (!object || object->Empty()) {
return TPredicateContainer(NArrow::ECompareType::GREATER_OR_EQUAL);
} else {
if (!object->Good()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not good 'from' predicate");
return {};
return TConclusionStatus::Fail("not good 'from' predicate");
}
if (!object->IsFrom()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "'from' predicate not is from");
return {};
return TConclusionStatus::Fail("'from' predicate not is from");
}
if (indexInfo) {
if (pkSchema) {
auto cNames = object->ColumnNames();
i32 countSortingFields = 0;
for (i32 i = 0; i < indexInfo->GetReplaceKey()->num_fields(); ++i) {
if (i < (int)cNames.size() && cNames[i] == indexInfo->GetReplaceKey()->field(i)->name()) {
for (i32 i = 0; i < pkSchema->num_fields(); ++i) {
if (i < (int)cNames.size() && cNames[i] == pkSchema->field(i)->name()) {
++countSortingFields;
} else {
break;
}
}
Y_ABORT_UNLESS(countSortingFields == object->Batch->num_columns());
AFL_VERIFY(countSortingFields == object->Batch->num_columns())("count", countSortingFields)("object", object->Batch->num_columns());
}
return TPredicateContainer(object, indexInfo ? ExtractKey(*object, indexInfo->GetReplaceKey()) : nullptr);
return TPredicateContainer(object, pkSchema ? ExtractKey(*object, pkSchema) : nullptr);
}
}

std::optional<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo) {
TConclusion<TPredicateContainer> TPredicateContainer::BuildPredicateTo(
std::shared_ptr<TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema) {
if (!object || object->Empty()) {
return TPredicateContainer(NArrow::ECompareType::LESS_OR_EQUAL);
} else {
if (!object->Good()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not good 'to' predicate");
return {};
return TConclusionStatus::Fail("not good 'to' predicate");
}
if (!object->IsTo()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "'to' predicate not is to");
return {};
return TConclusionStatus::Fail("'to' predicate not is to");
}
if (indexInfo) {
if (pkSchema) {
auto cNames = object->ColumnNames();
i32 countSortingFields = 0;
for (i32 i = 0; i < indexInfo->GetReplaceKey()->num_fields(); ++i) {
if (i < (int)cNames.size() && cNames[i] == indexInfo->GetReplaceKey()->field(i)->name()) {
for (i32 i = 0; i < pkSchema->num_fields(); ++i) {
if (i < (int)cNames.size() && cNames[i] == pkSchema->field(i)->name()) {
++countSortingFields;
} else {
break;
}
}
Y_ABORT_UNLESS(countSortingFields == object->Batch->num_columns());
}
return TPredicateContainer(object, indexInfo ? TPredicateContainer::ExtractKey(*object, indexInfo->GetReplaceKey()) : nullptr);
return TPredicateContainer(object, pkSchema ? TPredicateContainer::ExtractKey(*object, pkSchema) : nullptr);
}
}

Expand Down
19 changes: 14 additions & 5 deletions ydb/core/tx/columnshard/engines/predicate/container.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#pragma once
#include "predicate.h"

#include <ydb/core/formats/arrow/arrow_filter.h>
#include <ydb/core/formats/arrow/replace_key.h>

#include <ydb/library/accessor/accessor.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>

#include <optional>

namespace NKikimr::NOlap {
Expand Down Expand Up @@ -45,6 +49,9 @@ class TPredicateContainer {
}

public:
NArrow::ECompareType GetCompareType() const {
return CompareType;
}

const std::shared_ptr<NArrow::TReplaceKey>& GetReplaceKey() const {
return ReplaceKey;
Expand All @@ -55,8 +62,8 @@ class TPredicateContainer {
}

template <class TArrayColumn>
std::optional<typename TArrayColumn::value_type> Get(const ui32 colIndex, const ui32 rowIndex,
const std::optional<typename TArrayColumn::value_type> defaultValue = {}) const {
std::optional<typename TArrayColumn::value_type> Get(
const ui32 colIndex, const ui32 rowIndex, const std::optional<typename TArrayColumn::value_type> defaultValue = {}) const {
if (!Object) {
return defaultValue;
} else {
Expand All @@ -80,13 +87,15 @@ class TPredicateContainer {
return TPredicateContainer(NArrow::ECompareType::GREATER_OR_EQUAL);
}

static std::optional<TPredicateContainer> BuildPredicateFrom(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo);
static TConclusion<TPredicateContainer> BuildPredicateFrom(
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema);

static TPredicateContainer BuildNullPredicateTo() {
return TPredicateContainer(NArrow::ECompareType::LESS_OR_EQUAL);
}

static std::optional<TPredicateContainer> BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo);
static TConclusion<TPredicateContainer> BuildPredicateTo(
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema);

NKikimr::NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const {
if (!Object) {
Expand All @@ -96,4 +105,4 @@ class TPredicateContainer {
}
};

}
} // namespace NKikimr::NOlap
147 changes: 128 additions & 19 deletions ydb/core/tx/columnshard/engines/predicate/filter.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include "filter.h"

#include <ydb/core/formats/arrow/serializer/native.h>

#include <ydb/library/actors/core/log.h>

namespace NKikimr::NOlap {
Expand All @@ -14,43 +17,50 @@ NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(const arrow::Datum&
return result;
}

bool TPKRangesFilter::Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const TIndexInfo* indexInfo) {
TConclusionStatus TPKRangesFilter::Add(
std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const std::shared_ptr<arrow::Schema>& pkSchema) {
if ((!f || f->Empty()) && (!t || t->Empty())) {
return true;
return TConclusionStatus::Success();
}
auto fromContainerConclusion = TPredicateContainer::BuildPredicateFrom(f, pkSchema);
if (fromContainerConclusion.IsFail()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect from container")(
"from", fromContainerConclusion.GetErrorMessage());
return fromContainerConclusion;
}
auto fromContainer = TPredicateContainer::BuildPredicateFrom(f, indexInfo);
auto toContainer = TPredicateContainer::BuildPredicateTo(t, indexInfo);
if (!fromContainer || !toContainer) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect from/to containers")("from", !!fromContainer)("to", !!toContainer);
return false;
auto toContainerConclusion = TPredicateContainer::BuildPredicateTo(t, pkSchema);
if (toContainerConclusion.IsFail()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect to container")(
"from", toContainerConclusion.GetErrorMessage());
return toContainerConclusion;
}
if (SortedRanges.size() && !FakeRanges) {
if (ReverseFlag) {
if (fromContainer->CrossRanges(SortedRanges.front().GetPredicateTo())) {
if (fromContainerConclusion->CrossRanges(SortedRanges.front().GetPredicateTo())) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not sorted sequence");
return false;
return TConclusionStatus::Fail("not sorted sequence");
}
} else {
if (fromContainer->CrossRanges(SortedRanges.back().GetPredicateTo())) {
if (fromContainerConclusion->CrossRanges(SortedRanges.back().GetPredicateTo())) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not sorted sequence");
return false;
return TConclusionStatus::Fail("not sorted sequence");
}
}
}
auto pkRangeFilter = TPKRangeFilter::Build(std::move(*fromContainer), std::move(*toContainer));
if (!pkRangeFilter) {
return false;
auto pkRangeFilterConclusion = TPKRangeFilter::Build(fromContainerConclusion.DetachResult(), toContainerConclusion.DetachResult());
if (pkRangeFilterConclusion.IsFail()) {
return pkRangeFilterConclusion;
}
if (FakeRanges) {
FakeRanges = false;
SortedRanges.clear();
}
if (ReverseFlag) {
SortedRanges.emplace_front(std::move(*pkRangeFilter));
SortedRanges.emplace_front(pkRangeFilterConclusion.DetachResult());
} else {
SortedRanges.emplace_back(std::move(*pkRangeFilter));
SortedRanges.emplace_back(pkRangeFilterConclusion.DetachResult());
}
return true;
return TConclusionStatus::Success();
}

TString TPKRangesFilter::DebugString() const {
Expand Down Expand Up @@ -84,6 +94,15 @@ bool TPKRangesFilter::IsPortionInUsage(const TPortionInfo& info) const {
return SortedRanges.empty();
}

bool TPKRangesFilter::CheckPoint(const NArrow::TReplaceKey& point) const {
for (auto&& i : SortedRanges) {
if (i.CheckPoint(point)) {
return true;
}
}
return SortedRanges.empty();
}

TPKRangeFilter::EUsageClass TPKRangesFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end) const {
for (auto&& i : SortedRanges) {
switch (i.IsPortionInPartialUsage(start, end)) {
Expand All @@ -99,11 +118,101 @@ TPKRangeFilter::EUsageClass TPKRangesFilter::IsPortionInPartialUsage(const NArro
}

TPKRangesFilter::TPKRangesFilter(const bool reverse)
: ReverseFlag(reverse)
{
: ReverseFlag(reverse) {
auto range = TPKRangeFilter::Build(TPredicateContainer::BuildNullPredicateFrom(), TPredicateContainer::BuildNullPredicateTo());
Y_ABORT_UNLESS(range);
SortedRanges.emplace_back(*range);
}

std::shared_ptr<arrow::RecordBatch> TPKRangesFilter::SerializeToRecordBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const {
auto fullSchema = NArrow::TStatusValidator::GetValid(
pkSchema->AddField(pkSchema->num_fields(), std::make_shared<arrow::Field>(".ydb_operation_type", arrow::uint32())));
auto builders = NArrow::MakeBuilders(fullSchema, SortedRanges.size() * 2);
for (auto&& i : SortedRanges) {
for (ui32 idx = 0; idx < (ui32)pkSchema->num_fields(); ++idx) {
if (idx < i.GetPredicateFrom().GetReplaceKey()->Size()) {
AFL_VERIFY(NArrow::Append(
*builders[idx], i.GetPredicateFrom().GetReplaceKey()->Column(idx), i.GetPredicateFrom().GetReplaceKey()->GetPosition()));
} else {
NArrow::TStatusValidator::Validate(builders[idx]->AppendNull());
}
}
NArrow::Append<arrow::UInt32Type>(*builders[pkSchema->num_fields()], (ui32)i.GetPredicateFrom().GetCompareType());

for (ui32 idx = 0; idx < (ui32)pkSchema->num_fields(); ++idx) {
if (idx < i.GetPredicateTo().GetReplaceKey()->Size()) {
AFL_VERIFY(NArrow::Append(
*builders[idx], i.GetPredicateTo().GetReplaceKey()->Column(idx), i.GetPredicateTo().GetReplaceKey()->GetPosition()));
} else {
NArrow::TStatusValidator::Validate(builders[idx]->AppendNull());
}
}
NArrow::Append<arrow::UInt32Type>(*builders[pkSchema->num_fields()], (ui32)i.GetPredicateTo().GetCompareType());
}
return arrow::RecordBatch::Make(fullSchema, SortedRanges.size() * 2, NArrow::Finish(std::move(builders)));
}

std::shared_ptr<NKikimr::NOlap::TPKRangesFilter> TPKRangesFilter::BuildFromRecordBatchLines(
const std::shared_ptr<arrow::RecordBatch>& batch, const bool reverse) {
std::shared_ptr<TPKRangesFilter> result = std::make_shared<TPKRangesFilter>(reverse);
for (ui32 i = 0; i < batch->num_rows(); ++i) {
auto batchRow = batch->Slice(i, 1);
auto pFrom = std::make_shared<NOlap::TPredicate>(NKernels::EOperation::GreaterEqual, batchRow);
auto pTo = std::make_shared<NOlap::TPredicate>(NKernels::EOperation::LessEqual, batchRow);
result->Add(pFrom, pTo, batch->schema()).Validate();
}
return result;
}

std::shared_ptr<NKikimr::NOlap::TPKRangesFilter> TPKRangesFilter::BuildFromRecordBatchFull(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& pkSchema, const bool reverse) {
std::shared_ptr<TPKRangesFilter> result = std::make_shared<TPKRangesFilter>(reverse);
auto pkBatch = NArrow::TColumnOperator().Adapt(batch, pkSchema).DetachResult();
auto c = batch->GetColumnByName(".ydb_operation_type");
AFL_VERIFY(c);
AFL_VERIFY(c->type_id() == arrow::Type::UINT32);
auto cUi32 = static_pointer_cast<arrow::UInt32Array>(c);
for (ui32 i = 0; i < batch->num_rows();) {
std::shared_ptr<NOlap::TPredicate> pFrom;
std::shared_ptr<NOlap::TPredicate> pTo;
{
auto batchRow = TPredicate::CutNulls(batch->Slice(i, 1));
NKernels::EOperation op = (NKernels::EOperation)cUi32->Value(i);
if (op == NKernels::EOperation::GreaterEqual || op == NKernels::EOperation::Greater) {
pFrom = std::make_shared<NOlap::TPredicate>(op, batchRow);
} else if (op == NKernels::EOperation::Equal) {
pFrom = std::make_shared<NOlap::TPredicate>(NKernels::EOperation::GreaterEqual, batchRow);
} else {
AFL_VERIFY(false);
}
if (op != NKernels::EOperation::Equal) {
++i;
}
}
{
auto batchRow = TPredicate::CutNulls(batch->Slice(i, 1));
NKernels::EOperation op = (NKernels::EOperation)cUi32->Value(i);
if (op == NKernels::EOperation::LessEqual || op == NKernels::EOperation::Less) {
pTo = std::make_shared<NOlap::TPredicate>(op, batchRow);
} else if (op == NKernels::EOperation::Equal) {
pTo = std::make_shared<NOlap::TPredicate>(NKernels::EOperation::LessEqual, batchRow);
} else {
AFL_VERIFY(false);
}
}
result->Add(pFrom, pTo, pkSchema).Validate();
}
return result;
}

std::shared_ptr<NKikimr::NOlap::TPKRangesFilter> TPKRangesFilter::BuildFromString(
const TString& data, const std::shared_ptr<arrow::Schema>& pkSchema, const bool reverse) {
auto batch = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TNativeSerializer().Deserialize(data));
return BuildFromRecordBatchFull(batch, pkSchema, reverse);
}

TString TPKRangesFilter::SerializeToString(const std::shared_ptr<arrow::Schema>& pkSchema) const {
return NArrow::NSerialization::TNativeSerializer().SerializeFull(SerializeToRecordBatch(pkSchema));
}

} // namespace NKikimr::NOlap
33 changes: 31 additions & 2 deletions ydb/core/tx/columnshard/engines/predicate/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ class TPKRangesFilter {
bool FakeRanges = true;
std::deque<TPKRangeFilter> SortedRanges;
bool ReverseFlag = false;

public:
TPKRangesFilter(const bool reverse);

[[nodiscard]] TConclusionStatus Add(
std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const std::shared_ptr<arrow::Schema>& pkSchema);
std::shared_ptr<arrow::RecordBatch> SerializeToRecordBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const;
TString SerializeToString(const std::shared_ptr<arrow::Schema>& pkSchema) const;

bool IsEmpty() const {
return SortedRanges.empty() || FakeRanges;
}
Expand Down Expand Up @@ -39,11 +45,10 @@ class TPKRangesFilter {

bool IsPortionInUsage(const TPortionInfo& info) const;
TPKRangeFilter::EUsageClass IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end) const;
bool CheckPoint(const NArrow::TReplaceKey& point) const;

NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;

[[nodiscard]] bool Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const TIndexInfo* indexInfo);

std::set<std::string> GetColumnNames() const {
std::set<std::string> result;
for (auto&& i : SortedRanges) {
Expand All @@ -57,6 +62,30 @@ class TPKRangesFilter {
TString DebugString() const;

std::set<ui32> GetColumnIds(const TIndexInfo& indexInfo) const;

static std::shared_ptr<TPKRangesFilter> BuildFromRecordBatchLines(const std::shared_ptr<arrow::RecordBatch>& batch, const bool reverse);

static std::shared_ptr<TPKRangesFilter> BuildFromRecordBatchFull(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& pkSchema, const bool reverse);
static std::shared_ptr<TPKRangesFilter> BuildFromString(
const TString& data, const std::shared_ptr<arrow::Schema>& pkSchema, const bool reverse);

template <class TProto>
static TConclusion<TPKRangesFilter> BuildFromProto(const TProto& proto, const bool reverse, const std::vector<TNameTypeInfo>& ydbPk) {
TPKRangesFilter result(reverse);
for (auto& protoRange : proto.GetRanges()) {
TSerializedTableRange range(protoRange);
auto fromPredicate = std::make_shared<TPredicate>();
auto toPredicate = std::make_shared<TPredicate>();
TSerializedTableRange serializedRange(protoRange);
std::tie(*fromPredicate, *toPredicate) = TPredicate::DeserializePredicatesRange(serializedRange, ydbPk);
auto status = result.Add(fromPredicate, toPredicate, NArrow::TStatusValidator::GetValid(NArrow::MakeArrowSchema(ydbPk)));
if (status.IsFail()) {
return status;
}
}
return result;
}
};

}
Loading

0 comments on commit c290e0a

Please sign in to comment.