Skip to content

Commit

Permalink
refactoring for ranges control
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Aug 14, 2024
1 parent 8f1e398 commit 8e165c6
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 167 deletions.
32 changes: 17 additions & 15 deletions ydb/core/tx/columnshard/engines/predicate/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,59 +115,61 @@ bool TPredicateContainer::CrossRanges(const TPredicateContainer& ext) {
}
}

std::optional<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateFrom(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo) {
TConclusion<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateFrom(
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema) {
if (!object || object->Empty()) {
return TPredicateContainer(NArrow::ECompareType::GREATER_OR_EQUAL);
} else {
if (!object->Good()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not good 'from' predicate");
return {};
return TConclusionStatus::Fail("not good 'from' predicate");
}
if (!object->IsFrom()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "'from' predicate not is from");
return {};
return TConclusionStatus::Fail("'from' predicate not is from");
}
if (indexInfo) {
if (pkSchema) {
auto cNames = object->ColumnNames();
i32 countSortingFields = 0;
for (i32 i = 0; i < indexInfo->GetReplaceKey()->num_fields(); ++i) {
if (i < (int)cNames.size() && cNames[i] == indexInfo->GetReplaceKey()->field(i)->name()) {
for (i32 i = 0; i < pkSchema->num_fields(); ++i) {
if (i < (int)cNames.size() && cNames[i] == pkSchema->field(i)->name()) {
++countSortingFields;
} else {
break;
}
}
Y_ABORT_UNLESS(countSortingFields == object->Batch->num_columns());
AFL_VERIFY(countSortingFields == object->Batch->num_columns())("count", countSortingFields)("object", object->Batch->num_columns());
}
return TPredicateContainer(object, indexInfo ? ExtractKey(*object, indexInfo->GetReplaceKey()) : nullptr);
return TPredicateContainer(object, pkSchema ? ExtractKey(*object, pkSchema) : nullptr);
}
}

std::optional<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo) {
TConclusion<TPredicateContainer> TPredicateContainer::BuildPredicateTo(
std::shared_ptr<TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema) {
if (!object || object->Empty()) {
return TPredicateContainer(NArrow::ECompareType::LESS_OR_EQUAL);
} else {
if (!object->Good()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not good 'to' predicate");
return {};
return TConclusionStatus::Fail("not good 'to' predicate");
}
if (!object->IsTo()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "'to' predicate not is to");
return {};
return TConclusionStatus::Fail("'to' predicate not is to");
}
if (indexInfo) {
if (pkSchema) {
auto cNames = object->ColumnNames();
i32 countSortingFields = 0;
for (i32 i = 0; i < indexInfo->GetReplaceKey()->num_fields(); ++i) {
if (i < (int)cNames.size() && cNames[i] == indexInfo->GetReplaceKey()->field(i)->name()) {
for (i32 i = 0; i < pkSchema->num_fields(); ++i) {
if (i < (int)cNames.size() && cNames[i] == pkSchema->field(i)->name()) {
++countSortingFields;
} else {
break;
}
}
Y_ABORT_UNLESS(countSortingFields == object->Batch->num_columns());
}
return TPredicateContainer(object, indexInfo ? TPredicateContainer::ExtractKey(*object, indexInfo->GetReplaceKey()) : nullptr);
return TPredicateContainer(object, pkSchema ? TPredicateContainer::ExtractKey(*object, pkSchema) : nullptr);
}
}

Expand Down
17 changes: 11 additions & 6 deletions ydb/core/tx/columnshard/engines/predicate/container.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#pragma once
#include "predicate.h"

#include <ydb/core/formats/arrow/arrow_filter.h>
#include <ydb/core/formats/arrow/replace_key.h>

#include <ydb/library/accessor/accessor.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>

#include <optional>

namespace NKikimr::NOlap {
Expand Down Expand Up @@ -45,7 +49,6 @@ class TPredicateContainer {
}

public:

const std::shared_ptr<NArrow::TReplaceKey>& GetReplaceKey() const {
return ReplaceKey;
}
Expand All @@ -55,8 +58,8 @@ class TPredicateContainer {
}

template <class TArrayColumn>
std::optional<typename TArrayColumn::value_type> Get(const ui32 colIndex, const ui32 rowIndex,
const std::optional<typename TArrayColumn::value_type> defaultValue = {}) const {
std::optional<typename TArrayColumn::value_type> Get(
const ui32 colIndex, const ui32 rowIndex, const std::optional<typename TArrayColumn::value_type> defaultValue = {}) const {
if (!Object) {
return defaultValue;
} else {
Expand All @@ -80,13 +83,15 @@ class TPredicateContainer {
return TPredicateContainer(NArrow::ECompareType::GREATER_OR_EQUAL);
}

static std::optional<TPredicateContainer> BuildPredicateFrom(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo);
static TConclusion<TPredicateContainer> BuildPredicateFrom(
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema);

static TPredicateContainer BuildNullPredicateTo() {
return TPredicateContainer(NArrow::ECompareType::LESS_OR_EQUAL);
}

static std::optional<TPredicateContainer> BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo);
static TConclusion<TPredicateContainer> BuildPredicateTo(
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema);

NKikimr::NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const {
if (!Object) {
Expand All @@ -96,4 +101,4 @@ class TPredicateContainer {
}
};

}
} // namespace NKikimr::NOlap
40 changes: 23 additions & 17 deletions ydb/core/tx/columnshard/engines/predicate/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,49 @@ NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(const arrow::Datum&
return result;
}

bool TPKRangesFilter::Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const TIndexInfo* indexInfo) {
TConclusionStatus TPKRangesFilter::Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const std::shared_ptr<arrow::Schema>& pkSchema) {
if ((!f || f->Empty()) && (!t || t->Empty())) {
return true;
return TConclusionStatus::Success();
}
auto fromContainer = TPredicateContainer::BuildPredicateFrom(f, indexInfo);
auto toContainer = TPredicateContainer::BuildPredicateTo(t, indexInfo);
if (!fromContainer || !toContainer) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect from/to containers")("from", !!fromContainer)("to", !!toContainer);
return false;
auto fromContainerConclusion = TPredicateContainer::BuildPredicateFrom(f, pkSchema);
if (fromContainerConclusion.IsFail()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect from container")(
"from", fromContainerConclusion.GetErrorMessage());
return fromContainerConclusion;
}
auto toContainerConclusion = TPredicateContainer::BuildPredicateTo(t, pkSchema);
if (toContainerConclusion.IsFail()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect to container")(
"from", toContainerConclusion.GetErrorMessage());
return toContainerConclusion;
}
if (SortedRanges.size() && !FakeRanges) {
if (ReverseFlag) {
if (fromContainer->CrossRanges(SortedRanges.front().GetPredicateTo())) {
if (fromContainerConclusion->CrossRanges(SortedRanges.front().GetPredicateTo())) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not sorted sequence");
return false;
return TConclusionStatus::Fail("not sorted sequence");
}
} else {
if (fromContainer->CrossRanges(SortedRanges.back().GetPredicateTo())) {
if (fromContainerConclusion->CrossRanges(SortedRanges.back().GetPredicateTo())) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not sorted sequence");
return false;
return TConclusionStatus::Fail("not sorted sequence");
}
}
}
auto pkRangeFilter = TPKRangeFilter::Build(std::move(*fromContainer), std::move(*toContainer));
if (!pkRangeFilter) {
return false;
auto pkRangeFilterConclusion = TPKRangeFilter::Build(fromContainerConclusion.DetachResult(), toContainerConclusion.DetachResult());
if (pkRangeFilterConclusion.IsFail()) {
return pkRangeFilterConclusion;
}
if (FakeRanges) {
FakeRanges = false;
SortedRanges.clear();
}
if (ReverseFlag) {
SortedRanges.emplace_front(std::move(*pkRangeFilter));
SortedRanges.emplace_front(pkRangeFilterConclusion.DetachResult());
} else {
SortedRanges.emplace_back(std::move(*pkRangeFilter));
SortedRanges.emplace_back(pkRangeFilterConclusion.DetachResult());
}
return true;
return TConclusionStatus::Success();
}

TString TPKRangesFilter::DebugString() const {
Expand Down
20 changes: 19 additions & 1 deletion ydb/core/tx/columnshard/engines/predicate/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class TPKRangesFilter {

NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;

[[nodiscard]] bool Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const TIndexInfo* indexInfo);
[[nodiscard]] TConclusionStatus Add(
std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const std::shared_ptr<arrow::Schema>& pkSchema);

std::set<std::string> GetColumnNames() const {
std::set<std::string> result;
Expand All @@ -57,6 +58,23 @@ class TPKRangesFilter {
TString DebugString() const;

std::set<ui32> GetColumnIds(const TIndexInfo& indexInfo) const;

template <class TProto>
static TConclusion<TPKRangesFilter> BuildFromProto(const TProto& proto, const bool reverse, const std::vector<TNameTypeInfo>& ydbPk) {
TPKRangesFilter result(reverse);
for (auto& protoRange : proto.GetRanges()) {
TSerializedTableRange range(protoRange);
auto fromPredicate = std::make_shared<TPredicate>();
auto toPredicate = std::make_shared<TPredicate>();
TSerializedTableRange serializedRange(protoRange);
std::tie(*fromPredicate, *toPredicate) = TPredicate::DeserializePredicatesRange(serializedRange, ydbPk);
auto status = result.Add(fromPredicate, toPredicate, NArrow::TStatusValidator::GetValid(NArrow::MakeArrowSchema(ydbPk)));
if (status.IsFail()) {
return status;
}
}
return result;
}
};

}
98 changes: 92 additions & 6 deletions ydb/core/tx/columnshard/engines/predicate/predicate.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
#include "predicate.h"

#include <ydb/core/formats/arrow/arrow_batch_builder.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/switch_type.h>

namespace NKikimr::NOlap {

TPredicate::TPredicate(EOperation op, std::shared_ptr<arrow::RecordBatch> batch) noexcept
: Operation(op)
, Batch(std::move(batch))
{
, Batch(std::move(batch)) {
Y_ABORT_UNLESS(IsFrom() || IsTo());
}

TPredicate::TPredicate(EOperation op, const TString& serializedBatch, const std::shared_ptr<arrow::Schema>& schema)
: Operation(op)
{
: Operation(op) {
Y_ABORT_UNLESS(IsFrom() || IsTo());
if (!serializedBatch.empty()) {
Batch = NArrow::DeserializeBatch(serializedBatch, schema);
Expand All @@ -31,7 +30,94 @@ std::vector<TString> TPredicate::ColumnNames() const {
return out;
}

IOutputStream& operator << (IOutputStream& out, const TPredicate& pred) {
std::vector<NScheme::TTypeInfo> ExtractTypes(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns) {
std::vector<NScheme::TTypeInfo> types;
types.reserve(columns.size());
for (auto& [name, type] : columns) {
types.push_back(type);
}
return types;
}

TString FromCells(const TConstArrayRef<TCell>& cells, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns) {
Y_ABORT_UNLESS(cells.size() == columns.size());
if (cells.empty()) {
return {};
}

std::vector<NScheme::TTypeInfo> types = ExtractTypes(columns);

NArrow::TArrowBatchBuilder batchBuilder;
batchBuilder.Reserve(1);
auto startStatus = batchBuilder.Start(columns);
Y_ABORT_UNLESS(startStatus.ok(), "%s", startStatus.ToString().c_str());

batchBuilder.AddRow(NKikimr::TDbTupleRef(), NKikimr::TDbTupleRef(types.data(), cells.data(), cells.size()));

auto batch = batchBuilder.FlushBatch(false);
Y_ABORT_UNLESS(batch);
Y_ABORT_UNLESS(batch->num_columns() == (int)cells.size());
Y_ABORT_UNLESS(batch->num_rows() == 1);
return NArrow::SerializeBatchNoCompression(batch);
}

std::pair<NKikimr::NOlap::TPredicate, NKikimr::NOlap::TPredicate> TPredicate::DeserializePredicatesRange(
const TSerializedTableRange& range, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns) {
std::vector<TCell> leftCells;
std::vector<std::pair<TString, NScheme::TTypeInfo>> leftColumns;
bool leftTrailingNull = false;
{
TConstArrayRef<TCell> cells = range.From.GetCells();
const size_t size = cells.size();
Y_ASSERT(size <= columns.size());
leftCells.reserve(size);
leftColumns.reserve(size);
for (size_t i = 0; i < size; ++i) {
if (!cells[i].IsNull()) {
leftCells.push_back(cells[i]);
leftColumns.push_back(columns[i]);
leftTrailingNull = false;
} else {
leftTrailingNull = true;
}
}
}

std::vector<TCell> rightCells;
std::vector<std::pair<TString, NScheme::TTypeInfo>> rightColumns;
bool rightTrailingNull = false;
{
TConstArrayRef<TCell> cells = range.To.GetCells();
const size_t size = cells.size();
Y_ASSERT(size <= columns.size());
rightCells.reserve(size);
rightColumns.reserve(size);
for (size_t i = 0; i < size; ++i) {
if (!cells[i].IsNull()) {
rightCells.push_back(cells[i]);
rightColumns.push_back(columns[i]);
rightTrailingNull = false;
} else {
rightTrailingNull = true;
}
}
}

const bool fromInclusive = range.FromInclusive || leftTrailingNull;
const bool toInclusive = range.ToInclusive && !rightTrailingNull;

TString leftBorder = FromCells(leftCells, leftColumns);
TString rightBorder = FromCells(rightCells, rightColumns);
auto leftSchema = NArrow::MakeArrowSchema(leftColumns);
Y_ASSERT(leftSchema.ok());
auto rightSchema = NArrow::MakeArrowSchema(rightColumns);
Y_ASSERT(rightSchema.ok());
return std::make_pair(
TPredicate(fromInclusive ? NKernels::EOperation::GreaterEqual : NKernels::EOperation::Greater, leftBorder, leftSchema.ValueUnsafe()),
TPredicate(toInclusive ? NKernels::EOperation::LessEqual : NKernels::EOperation::Less, rightBorder, rightSchema.ValueUnsafe()));
}

IOutputStream& operator<<(IOutputStream& out, const TPredicate& pred) {
out << NSsa::GetFunctionName(pred.Operation);

for (i32 i = 0; i < pred.Batch->num_columns(); ++i) {
Expand Down Expand Up @@ -61,4 +147,4 @@ IOutputStream& operator << (IOutputStream& out, const TPredicate& pred) {
return out;
}

} // namespace NKikimr::NOlap
} // namespace NKikimr::NOlap
Loading

0 comments on commit 8e165c6

Please sign in to comment.