Skip to content

Commit

Permalink
Correct schemas adaptation (#9425)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 18, 2024
1 parent e6a5701 commit d8ee31d
Show file tree
Hide file tree
Showing 21 changed files with 321 additions and 133 deletions.
14 changes: 12 additions & 2 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,18 @@ bool IsSortedAndUnique(const std::shared_ptr<arrow::RecordBatch>& batch,
}
}

std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
std::shared_ptr<arrow::RecordBatch> SortBatch(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::shared_ptr<arrow::Array>>& sortingKey, const bool andUnique) {
auto sortPermutation = MakeSortPermutation(sortingKey, andUnique);
if (sortPermutation) {
return Reorder(batch, sortPermutation, andUnique);
} else {
return batch;
}
}

std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey,
const bool andUnique) {
auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);
if (sortPermutation) {
return Reorder(batch, sortPermutation, andUnique);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/formats/arrow/arrow_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b
std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob,
const std::shared_ptr<arrow::Schema>& schema);

std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
std::shared_ptr<arrow::RecordBatch> SortBatch(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
std::shared_ptr<arrow::RecordBatch> SortBatch(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::shared_ptr<arrow::Array>>& sortingKey, const bool andUnique);
bool IsSorted(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey,
bool desc = false);
Expand Down
37 changes: 24 additions & 13 deletions ydb/core/formats/arrow/permutations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@

namespace NKikimr::NArrow {

std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
auto keyBatch = TColumnOperator().VerifyIfAbsent().Adapt(batch, sortingKey).DetachResult();
auto keyColumns = std::make_shared<TArrayVec>(keyBatch->columns());
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::vector<std::shared_ptr<arrow::Array>>& keyColumns, const bool andUnique) {
std::optional<i64> count;
for (auto&& i : keyColumns) {
AFL_VERIFY(i);
if (!count) {
count = i->length();
} else {
AFL_VERIFY(*count == i->length());
}
}
AFL_VERIFY(count);
std::vector<TRawReplaceKey> points;
points.reserve(keyBatch->num_rows());

for (int i = 0; i < keyBatch->num_rows(); ++i) {
points.push_back(TRawReplaceKey(keyColumns.get(), i));
points.reserve(*count);
for (int i = 0; i < *count; ++i) {
points.push_back(TRawReplaceKey(&keyColumns, i));
}

bool haveNulls = false;
for (auto& column : *keyColumns) {
for (auto& column : keyColumns) {
if (HasNulls(column)) {
haveNulls = true;
break;
Expand All @@ -36,11 +43,9 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
if (haveNulls) {
std::sort(points.begin(), points.end());
} else {
std::sort(points.begin(), points.end(),
[](const TRawReplaceKey& a, const TRawReplaceKey& b) {
return a.CompareNotNull(b) == std::partial_ordering::less;
}
);
std::sort(points.begin(), points.end(), [](const TRawReplaceKey& a, const TRawReplaceKey& b) {
return a.CompareNotNull(b) == std::partial_ordering::less;
});
}

arrow::UInt64Builder builder;
Expand Down Expand Up @@ -78,6 +83,12 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
return out;
}

std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
auto keyBatch = TColumnOperator().VerifyIfAbsent().Adapt(batch, sortingKey).DetachResult();
return MakeSortPermutation(keyBatch->columns(), andUnique);
}

namespace {

template <class TDataContainer>
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/formats/arrow/permutations.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class THashConstructor {

};

std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);

}
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::vector<std::shared_ptr<arrow::Array>>& columns, const bool andUnique);

} // namespace NKikimr::NArrow
107 changes: 96 additions & 11 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,22 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(
}

template <class TDataContainer, class TStringContainer>
std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProblemsPolicy& policy,
std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EAbsentFieldPolicy& policy,
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringContainer>& columnNames) {
AFL_VERIFY(incoming);
AFL_VERIFY(columnNames.size());
auto result = ExtractColumnsValidateImpl(incoming, columnNames);
switch (policy) {
case TColumnOperator::EExtractProblemsPolicy::Verify:
case TColumnOperator::EAbsentFieldPolicy::Verify:
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())(
"required", TColumnNameAccessor<TStringContainer>::DebugString(columnNames));
break;
case TColumnOperator::EExtractProblemsPolicy::Null:
case TColumnOperator::EAbsentFieldPolicy::Error:
if ((ui32)result->num_columns() != columnNames.size()) {
return nullptr;
}
break;
case TColumnOperator::EExtractProblemsPolicy::Skip:
case TColumnOperator::EAbsentFieldPolicy::Skip:
break;
}
return result;
Expand Down Expand Up @@ -211,8 +211,8 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
}
namespace {
template <class TDataContainer, class TSchemaImpl>
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::shared_ptr<TSchemaImpl>& dstSchema) {
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TDataContainer>& srcBatch,
const std::shared_ptr<TSchemaImpl>& dstSchema, const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
AFL_VERIFY(srcBatch);
AFL_VERIFY(dstSchema);
if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) {
Expand All @@ -228,10 +228,20 @@ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(
++itDst;
} else {
fieldIdx.emplace(itDst - dstSchema->fields().begin());
if (!(*itDst)->Equals(*itSrc)) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
return TConclusionStatus::Fail("incompatible column types");
if (checkFieldTypesPolicy != TColumnOperator::ECheckFieldTypesPolicy::Ignore && (*itDst)->Equals(*itSrc)) {
switch (checkFieldTypesPolicy) {
case TColumnOperator::ECheckFieldTypesPolicy::Error: {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
return TConclusionStatus::Fail("incompatible column types");
}
case TColumnOperator::ECheckFieldTypesPolicy::Verify: {
AFL_VERIFY(false)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
}
case TColumnOperator::ECheckFieldTypesPolicy::Ignore:
AFL_VERIFY(false);
}
}

++itDst;
Expand All @@ -249,7 +259,82 @@ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(

TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema) {
return BuildSequentialSubsetImpl(incoming, dstSchema);
return BuildSequentialSubsetImpl(incoming, dstSchema, DifferentColumnTypesPolicy);
}
namespace {
template <class TDataContainer>
TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(const std::shared_ptr<TDataContainer>& incoming,
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver,
const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) {
struct TFieldData {
ui32 Index;
std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn> Column;
bool operator<(const TFieldData& item) const {
return Index < item.Index;
}
};
AFL_VERIFY(incoming);
AFL_VERIFY(dstSchema);
std::vector<TFieldData> resultColumns;
resultColumns.reserve(incoming->num_columns());
ui32 idx = 0;
for (auto& srcField : incoming->schema()->fields()) {
const int dstIndex = nameResolver(srcField->name());
if (dstIndex > -1) {
const auto& dstField = dstSchema->GetFieldByIndexVerified(dstIndex);
switch (differentColumnTypesPolicy) {
case TColumnOperator::ECheckFieldTypesPolicy::Verify:
AFL_VERIFY(dstField->Equals(srcField))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"dst_column", dstField->ToString(true))("src_column", srcField->ToString(true));
break;
case TColumnOperator::ECheckFieldTypesPolicy::Error:
if (!dstField->Equals(srcField)) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"dst_column", dstField->ToString(true))("src_column", srcField->ToString(true));
return TConclusionStatus::Fail("incompatible column types for '" + dstField->name() + "'");
}
break;
case TColumnOperator::ECheckFieldTypesPolicy::Ignore:
break;
}
auto resultCheck = checker(idx, dstIndex);
if (resultCheck.IsFail()) {
return resultCheck;
}
resultColumns.emplace_back(TFieldData{ .Index = (ui32)dstIndex, .Column = incoming->column(idx) });
} else if (absentColumnPolicy == TColumnOperator::EAbsentFieldPolicy::Skip) {
} else if (absentColumnPolicy == TColumnOperator::EAbsentFieldPolicy::Verify) {
AFL_VERIFY(false)("event", "cannot_use_incoming_batch")("reason", "absent_field")("dst_column", srcField->ToString(true));
} else if (absentColumnPolicy == TColumnOperator::EAbsentFieldPolicy::Error) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "absent_field")(
"dst_column", srcField->ToString(true));
return TConclusionStatus::Fail("not found column '" + srcField->name() + "'");
} else {
AFL_VERIFY(false);
}
++idx;
}
if (resultColumns.empty()) {
return TConclusionStatus::Fail("not found any column");
}
std::sort(resultColumns.begin(), resultColumns.end());
std::vector<std::shared_ptr<arrow::Field>> fields;
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
columns.reserve(resultColumns.size());
fields.reserve(resultColumns.size());
for (auto&& i : resultColumns) {
fields.emplace_back(dstSchema->field(i.Index));
columns.emplace_back(i.Column);
}
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), incoming->num_rows());
}
} // namespace
TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::AdaptIncomingToDestinationExt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<TSchemaLite>& dstSchema,
const std::function<TConclusionStatus(const ui32, const i32)>& checker, const std::function<i32(const std::string&)>& nameResolver) const {
return AdaptIncomingToDestinationExtImpl(incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy);
}

} // namespace NKikimr::NArrow
41 changes: 34 additions & 7 deletions ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <ydb/library/conclusion/result.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
#include <functional>

namespace NKikimr::NArrow {

Expand All @@ -10,31 +11,57 @@ class TSchemaLite;

class TColumnOperator {
public:
enum class EExtractProblemsPolicy {
Null,
enum class EAbsentFieldPolicy {
Error,
Verify,
Skip
};

enum class ECheckFieldTypesPolicy {
Ignore,
Error,
Verify
};

private:
EExtractProblemsPolicy AbsentColumnPolicy = EExtractProblemsPolicy::Verify;
EAbsentFieldPolicy AbsentColumnPolicy = EAbsentFieldPolicy::Verify;
ECheckFieldTypesPolicy DifferentColumnTypesPolicy = ECheckFieldTypesPolicy::Error;

public:
TColumnOperator& NullIfAbsent() {
AbsentColumnPolicy = EExtractProblemsPolicy::Null;
TColumnOperator& VerifyOnDifferentFieldTypes() {
DifferentColumnTypesPolicy = ECheckFieldTypesPolicy::Verify;
return *this;
};

TColumnOperator& ErrorOnDifferentFieldTypes() {
DifferentColumnTypesPolicy = ECheckFieldTypesPolicy::Error;
return *this;
};

TColumnOperator& IgnoreOnDifferentFieldTypes() {
DifferentColumnTypesPolicy = ECheckFieldTypesPolicy::Ignore;
return *this;
};

TColumnOperator& ErrorIfAbsent() {
AbsentColumnPolicy = EAbsentFieldPolicy::Error;
return *this;
}

TColumnOperator& VerifyIfAbsent() {
AbsentColumnPolicy = EExtractProblemsPolicy::Verify;
AbsentColumnPolicy = EAbsentFieldPolicy::Verify;
return *this;
}

TColumnOperator& SkipIfAbsent() {
AbsentColumnPolicy = EExtractProblemsPolicy::Skip;
AbsentColumnPolicy = EAbsentFieldPolicy::Skip;
return *this;
}

TConclusion<std::shared_ptr<arrow::RecordBatch>> AdaptIncomingToDestinationExt(const std::shared_ptr<arrow::RecordBatch>& incoming,
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver) const;

std::shared_ptr<arrow::RecordBatch> Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/io_formats/arrow/csv_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
}

if (batch && ResultColumns.size()) {
batch = NArrow::TColumnOperator().NullIfAbsent().Extract(batch, ResultColumns);
batch = NArrow::TColumnOperator().ErrorIfAbsent().Extract(batch, ResultColumns);
if (!batch) {
errString = ErrorPrefix() + "not all result columns present";
}
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tablet_flat/flat_table_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ namespace NTable {
}
}

std::optional<ui32> GetCorrectKeyOrder() const {
if (KeyOrder == Max<TPos>()) {
return std::nullopt;
} else {
return KeyOrder;
}
}

NTable::TTag Id = Max<TTag>();
NScheme::TTypeInfo PType;
TString PTypeMod;
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,18 @@ std::optional<ui32> IIndexInfo::GetColumnIdOptional(const std::string& name) con
return {};
}

TString IIndexInfo::GetColumnName(ui32 id, bool required) const {
std::optional<ui32> IIndexInfo::GetColumnIndexOptional(const std::string& name, const ui32 shift) const {
if (name == SPEC_COL_PLAN_STEP) {
return shift + 0;
} else if (name == SPEC_COL_TX_ID) {
return shift + 1;
} else if (name == SPEC_COL_DELETE_FLAG) {
return shift + 2;
}
return {};
}

TString IIndexInfo::GetColumnName(const ui32 id, const bool required) const {
if (ESpecialColumn(id) == ESpecialColumn::PLAN_STEP) {
return SPEC_COL_PLAN_STEP;
} else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class IIndexInfo {
}

std::optional<ui32> GetColumnIdOptional(const std::string& name) const;
TString GetColumnName(ui32 id, bool required) const;
std::optional<ui32> GetColumnIndexOptional(const std::string& name, const ui32 shift) const;
TString GetColumnName(const ui32 id, const bool required) const;
static std::shared_ptr<arrow::Field> GetColumnFieldOptional(const ui32 columnId);
static std::shared_ptr<arrow::Field> GetColumnFieldVerified(const ui32 columnId);

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/column/info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ TConclusionStatus TSimpleColumnInfo::DeserializeFromProto(const NKikimrSchemeOp:

TSimpleColumnInfo::TSimpleColumnInfo(const ui32 columnId, const std::shared_ptr<arrow::Field>& arrowField,
const NArrow::NSerialization::TSerializerContainer& serializer, const bool needMinMax, const bool isSorted, const bool isNullable,
const std::shared_ptr<arrow::Scalar>& defaultValue)
const std::shared_ptr<arrow::Scalar>& defaultValue, const std::optional<ui32>& pkColumnIndex)
: ColumnId(columnId)
, PKColumnIndex(pkColumnIndex)
, ArrowField(arrowField)
, Serializer(serializer)
, NeedMinMax(needMinMax)
Expand Down
Loading

0 comments on commit d8ee31d

Please sign in to comment.