diff --git a/ydb/core/formats/arrow/process_columns.cpp b/ydb/core/formats/arrow/process_columns.cpp index f363b7613a81..cdc18dd9d1db 100644 --- a/ydb/core/formats/arrow/process_columns.cpp +++ b/ydb/core/formats/arrow/process_columns.cpp @@ -207,24 +207,23 @@ NKikimr::TConclusion> TColumnOperator::Reorder( return ReorderImpl(incoming, columnNames); } namespace { -template -TConclusion BuildSequentialSubsetImpl(const std::shared_ptr& srcBatch, - const std::shared_ptr& dstSchema, const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) { +template +TConclusion BuildSequentialSubsetImpl(const std::shared_ptr& srcBatch, const TSchemaLiteView& dstSchema, + const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) { AFL_VERIFY(srcBatch); - AFL_VERIFY(dstSchema); - if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) { + if (dstSchema.num_fields() < srcBatch->schema()->num_fields()) { AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns set: destination must been wider than source")( - "source", srcBatch->schema()->ToString())("destination", dstSchema->ToString()); + "source", srcBatch->schema()->ToString())("destination", dstSchema.ToString()); return TConclusionStatus::Fail("incorrect columns set: destination must been wider than source"); } std::set fieldIdx; auto itSrc = srcBatch->schema()->fields().begin(); - auto itDst = dstSchema->fields().begin(); - while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema->fields().end()) { + auto itDst = dstSchema.begin(); + while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema.end()) { if ((*itSrc)->name() != (*itDst)->name()) { ++itDst; } else { - fieldIdx.emplace(itDst - dstSchema->fields().begin()); + fieldIdx.emplace(itDst - dstSchema.begin()); if (checkFieldTypesPolicy != TColumnOperator::ECheckFieldTypesPolicy::Ignore && (*itDst)->Equals(*itSrc)) { switch (checkFieldTypesPolicy) { case TColumnOperator::ECheckFieldTypesPolicy::Error: { @@ -245,25 +244,24 @@ TConclusion BuildSequentialSubsetImpl(const std::shared_ptrfields().end() && itSrc != srcBatch->schema()->fields().end()) { + if (itDst == dstSchema.end() && itSrc != srcBatch->schema()->fields().end()) { AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns order in source set")("source", srcBatch->schema()->ToString())( - "destination", dstSchema->ToString()); + "destination", dstSchema.ToString()); return TConclusionStatus::Fail("incorrect columns order in source set"); } - return TSchemaSubset(fieldIdx, dstSchema->num_fields()); + return TSchemaSubset(fieldIdx, dstSchema.num_fields()); } } // namespace TConclusion TColumnOperator::BuildSequentialSubset( - const std::shared_ptr& incoming, const std::shared_ptr& dstSchema) { + const std::shared_ptr& incoming, const NArrow::TSchemaLiteView& dstSchema) { return BuildSequentialSubsetImpl(incoming, dstSchema, DifferentColumnTypesPolicy); } namespace { template TConclusion> AdaptIncomingToDestinationExtImpl(const std::shared_ptr& incoming, - const std::shared_ptr& dstSchema, const std::function& checker, - const std::function& nameResolver, - const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy, + const TSchemaLiteView& dstSchema, const std::function& checker, + const std::function& nameResolver, const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy, const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) { struct TFieldData { ui32 Index; @@ -273,14 +271,13 @@ TConclusion> AdaptIncomingToDestinationExtImpl(c } }; AFL_VERIFY(incoming); - AFL_VERIFY(dstSchema); std::vector resultColumns; resultColumns.reserve(incoming->num_columns()); ui32 idx = 0; for (auto& srcField : incoming->schema()->fields()) { const int dstIndex = nameResolver(srcField->name()); if (dstIndex > -1) { - const auto& dstField = dstSchema->GetFieldByIndexVerified(dstIndex); + const auto& dstField = dstSchema.GetFieldByIndexVerified(dstIndex); switch (differentColumnTypesPolicy) { case TColumnOperator::ECheckFieldTypesPolicy::Verify: AFL_VERIFY(dstField->type()->Equals(srcField->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")( @@ -322,14 +319,14 @@ TConclusion> AdaptIncomingToDestinationExtImpl(c columns.reserve(resultColumns.size()); fields.reserve(resultColumns.size()); for (auto&& i : resultColumns) { - fields.emplace_back(dstSchema->field(i.Index)); + fields.emplace_back(dstSchema.field(i.Index)); columns.emplace_back(i.Column); } return NAdapter::TDataBuilderPolicy::Build(std::make_shared(fields), std::move(columns), incoming->num_rows()); } } // namespace TConclusion> TColumnOperator::AdaptIncomingToDestinationExt( - const std::shared_ptr& incoming, const std::shared_ptr& dstSchema, + const std::shared_ptr& incoming, const TSchemaLiteView& dstSchema, const std::function& checker, const std::function& nameResolver) const { return AdaptIncomingToDestinationExtImpl(incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy); } diff --git a/ydb/core/formats/arrow/process_columns.h b/ydb/core/formats/arrow/process_columns.h index c4b418ada529..2eb7e77330b7 100644 --- a/ydb/core/formats/arrow/process_columns.h +++ b/ydb/core/formats/arrow/process_columns.h @@ -8,6 +8,7 @@ namespace NKikimr::NArrow { class TSchemaSubset; class TSchemaLite; +class TSchemaLiteView; class TColumnOperator { public: @@ -59,7 +60,7 @@ class TColumnOperator { } TConclusion> AdaptIncomingToDestinationExt(const std::shared_ptr& incoming, - const std::shared_ptr& dstSchema, const std::function& checker, + const TSchemaLiteView& dstSchema, const std::function& checker, const std::function& nameResolver) const; std::shared_ptr Extract( @@ -73,7 +74,7 @@ class TColumnOperator { std::shared_ptr Extract(const std::shared_ptr& incoming, const std::vector& columnNames); TConclusion BuildSequentialSubset( - const std::shared_ptr& incoming, const std::shared_ptr& dstSchema); + const std::shared_ptr& incoming, const NArrow::TSchemaLiteView& dstSchema); TConclusion> Adapt( const std::shared_ptr& incoming, const std::shared_ptr& dstSchema, TSchemaSubset* subset = nullptr); diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index f3bc2aa80e58..55cff6c401f8 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -146,7 +146,6 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBaseTablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema(); - auto schema = schemaSnapshot->GetSchema(); auto index = schemaSnapshot->GetColumnIdOptional(columnName); if (!index) { return TTxController::TProposeResult( diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 52a5f0ebd2b0..fe62107aa05f 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -108,7 +108,8 @@ class TPathFieldsInfo { if (!Schemas.contains(data.GetSchemaVersion())) { Schemas.emplace(data.GetSchemaVersion(), blobSchema); } - std::vector filteredIds = data.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().GetColumnIds(false)); + auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false); + std::vector filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end()); if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) { filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); } @@ -245,8 +246,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont std::shared_ptr batch; { const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange); + + auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema(); auto batchSchema = - std::make_shared(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema()->fields())); + std::make_shared(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end())); batch = std::make_shared(NArrow::DeserializeBatch(blobData, batchSchema)); std::set columnIdsToDelete = blobSchema->GetColumnIdsToDelete(resultSchema); if (!columnIdsToDelete.empty()) { diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h index 32a9e0a3034c..2251ec2c8eef 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h @@ -118,7 +118,7 @@ class TReadMetadataBase { ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const; - const std::shared_ptr& GetBlobSchema(const ui64 version) const { + NArrow::TSchemaLiteView GetBlobSchema(const ui64 version) const { return GetIndexVersions().GetSchemaVerified(version)->GetIndexInfo().ArrowSchema(); } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h index df07febacea0..b7d87c2b3812 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h @@ -131,15 +131,6 @@ class TReadMetadata: public TReadMetadataBase { TConclusionStatus Init( const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor); - std::vector GetColumnsOrder() const { - auto schema = GetResultSchema(); - std::vector result; - for (auto&& i : schema->GetSchema()->fields()) { - result.emplace_back(i->name()); - } - return result; - } - std::set GetEarlyFilterColumnIds() const; std::set GetPKColumnIds() const; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index c586ea83ff69..17a6643bd5f6 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -273,8 +273,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr& AFL_VERIFY(GetStageData().GetBlobs().size() == 1); auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first); auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion()); - auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared(CommittedBlob.GetSchemaSubset().Apply(schema->fields()))); - AFL_VERIFY(rBatch)("schema", schema->ToString()); + auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end()))); + AFL_VERIFY(rBatch)("schema", schema.ToString()); auto batch = std::make_shared(rBatch); std::set columnIdsToDelete = batchSchema->GetColumnIdsToDelete(resultSchema); if (!columnIdsToDelete.empty()) { diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/column_ids.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/column_ids.cpp new file mode 100644 index 000000000000..053f717c0f96 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/column_ids.cpp @@ -0,0 +1,3 @@ +#include "column_ids.h" + +namespace NKikimr::NOlap {} diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/column_ids.h b/ydb/core/tx/columnshard/engines/scheme/abstract/column_ids.h new file mode 100644 index 000000000000..0e38152dbe38 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/column_ids.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +#include +#include + +#include + +namespace NKikimr::NOlap { + +class TColumnIdsView: private TNonCopyable { +private: + std::span ColumnIds; + + class TIterator: public NArrow::NUtil::TRandomAccessIteratorClone::iterator, TIterator> { + using TBase = NArrow::NUtil::TRandomAccessIteratorClone::iterator, TIterator>; + + public: + using TBase::TRandomAccessIteratorClone; + }; + +public: + template + TColumnIdsView(const It begin, const It end) + : ColumnIds(begin, end) { + } + + TIterator begin() const { + return ColumnIds.begin(); + } + + TIterator end() const { + return ColumnIds.end(); + } + + ui32 operator[](size_t idx) const { + AFL_VERIFY(idx < ColumnIds.size())("idx", idx)("size", ColumnIds.size()); + return ColumnIds[idx]; + } + + ui64 size() const { + return ColumnIds.size(); + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h index 2db58b9fe960..7102cbaefc50 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h @@ -120,11 +120,9 @@ class IIndexInfo { return result; } - [[nodiscard]] static std::vector AddSpecialFieldIds(const std::vector& baseColumnIds) { - std::vector result = baseColumnIds; + static void AddSpecialFieldIds(std::vector& baseColumnIds) { const auto& cIds = GetSystemColumnIds(); - result.insert(result.end(), cIds.begin(), cIds.end()); - return result; + baseColumnIds.insert(baseColumnIds.end(), cIds.begin(), cIds.end()); } [[nodiscard]] static std::set AddSpecialFieldIds(const std::set& baseColumnIds) { diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make index 79b12f94389e..709793c4e38d 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( index_info.cpp + column_ids.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index ea7f6feaf027..9974e027bfa3 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -28,25 +28,23 @@ ui32 TIndexInfo::GetColumnIdVerified(const std::string& name) const { } std::optional TIndexInfo::GetColumnIdOptional(const std::string& name) const { - const auto pred = [](const TNameInfo& item, const std::string& value) { - return item.GetName() < value; - }; - auto it = std::lower_bound(ColumnNames.begin(), ColumnNames.end(), name, pred); - if (it != ColumnNames.end() && it->GetName() == name) { - return it->GetColumnId(); + auto idx = GetColumnIndexOptional(name); + if (!idx) { + return std::nullopt; } - return IIndexInfo::GetColumnIdOptional(name); + AFL_VERIFY(*idx < SchemaColumnIdsWithSpecials.size()); + return SchemaColumnIdsWithSpecials[*idx]; } std::optional TIndexInfo::GetColumnIndexOptional(const std::string& name) const { - const auto pred = [](const TNameInfo& item, const std::string& value) { - return item.GetName() < value; - }; - auto it = std::lower_bound(ColumnNames.begin(), ColumnNames.end(), name, pred); - if (it != ColumnNames.end() && it->GetName() == name) { - return it->GetColumnIdx(); + auto it = std::lower_bound(ColumnIdxSortedByName.begin(), ColumnIdxSortedByName.end(), name, [this](const ui32 idx, const std::string name) { + AFL_VERIFY(idx < ColumnFeatures.size()); + return ColumnFeatures[idx]->GetColumnName() < name; + }); + if (it != ColumnIdxSortedByName.end() && SchemaWithSpecials->GetFieldByIndexVerified(*it)->name() == name) { + return *it; } - return IIndexInfo::GetColumnIndexOptional(name, ColumnNames.size()); + return std::nullopt; } TString TIndexInfo::GetColumnName(const ui32 id, bool required) const { @@ -59,11 +57,12 @@ TString TIndexInfo::GetColumnName(const ui32 id, bool required) const { } } -const std::vector& TIndexInfo::GetColumnIds(const bool withSpecial) const { +TColumnIdsView TIndexInfo::GetColumnIds(const bool withSpecial) const { if (withSpecial) { - return SchemaColumnIdsWithSpecials; + return {SchemaColumnIdsWithSpecials.begin(), SchemaColumnIdsWithSpecials.end()}; } else { - return SchemaColumnIds; + AFL_VERIFY(SpecialColumnsCount < SchemaColumnIdsWithSpecials.size()); + return {SchemaColumnIdsWithSpecials.begin(), SchemaColumnIdsWithSpecials.end() - SpecialColumnsCount}; } } @@ -76,7 +75,8 @@ std::vector TIndexInfo::GetColumnNames(const std::vector& ids) co return out; } -std::vector TIndexInfo::GetColumnSTLNames(const std::vector& ids) const { +std::vector TIndexInfo::GetColumnSTLNames(const bool withSpecial) const { + const auto ids = GetColumnIds(withSpecial); std::vector out; out.reserve(ids.size()); for (ui32 id : ids) { @@ -85,9 +85,9 @@ std::vector TIndexInfo::GetColumnSTLNames(const std::vector& return out; } -const std::shared_ptr& TIndexInfo::ArrowSchema() const { - AFL_VERIFY(Schema); - return Schema; +NArrow::TSchemaLiteView TIndexInfo::ArrowSchema() const { + const auto& schema = ArrowSchemaWithSpecials(); + return std::span>(schema->fields().begin(), schema->fields().end() - SpecialColumnsCount); } const std::shared_ptr& TIndexInfo::ArrowSchemaWithSpecials() const { @@ -121,8 +121,7 @@ void TIndexInfo::SetAllKeys(const std::shared_ptr& operators, PKColumns.emplace_back(TNameTypeInfo(it->second.Name, it->second.PType)); } - if (!Schema) { - AFL_VERIFY(!SchemaWithSpecials); + if (!SchemaWithSpecials) { InitializeCaches(operators, columns, nullptr); Precalculate(); } @@ -249,18 +248,20 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& AFL_VERIFY(PKColumnIds.empty()); { TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::Columns"); + THashMap columnIds; for (const auto& col : schema.GetColumns()) { auto tableCol = BuildColumnFromProto(col, cache); auto id = tableCol.Id; + AFL_VERIFY(columnIds.emplace(tableCol.Name, id).second); AFL_VERIFY(columns.emplace(id, std::move(tableCol)).second); } - ColumnNames = TNameInfo::BuildColumnNames(columns); for (const auto& keyName : schema.GetKeyColumnNames()) { - const ui32 columnId = GetColumnIdVerified(keyName); - auto it = columns.find(columnId); + const ui32* findColumnId = columnIds.FindPtr(keyName); + AFL_VERIFY(findColumnId); + auto it = columns.find(*findColumnId); AFL_VERIFY(it != columns.end()); it->second.KeyOrder = PKColumnIds.size(); - PKColumnIds.push_back(columnId); + PKColumnIds.push_back(*findColumnId); } } InitializeCaches(operators, columns, cache, false); @@ -347,21 +348,20 @@ void TIndexInfo::InitializeCaches(const std::shared_ptr& opera const std::shared_ptr& cache, const bool withColumnFeatures) { { TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::Schema"); - AFL_VERIFY(!Schema); - SchemaColumnIds.reserve(columns.size()); + AFL_VERIFY(!SchemaWithSpecials); + SchemaColumnIdsWithSpecials.reserve(columns.size()); for (const auto& [id, _] : columns) { - SchemaColumnIds.push_back(id); + SchemaColumnIdsWithSpecials.push_back(id); } - std::sort(SchemaColumnIds.begin(), SchemaColumnIds.end()); - auto originalFields = TIndexInfo::MakeArrowFields(columns, SchemaColumnIds, cache); - Schema = std::make_shared(originalFields); + std::sort(SchemaColumnIdsWithSpecials.begin(), SchemaColumnIdsWithSpecials.end()); + auto originalFields = TIndexInfo::MakeArrowFields(columns, SchemaColumnIdsWithSpecials, cache); IIndexInfo::AddSpecialFields(originalFields); SchemaWithSpecials = std::make_shared(originalFields); } { TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::SchemaFields"); - SchemaColumnIdsWithSpecials = IIndexInfo::AddSpecialFieldIds(SchemaColumnIds); + IIndexInfo::AddSpecialFieldIds(SchemaColumnIdsWithSpecials); } if (withColumnFeatures) { AFL_VERIFY(ColumnFeatures.empty()); @@ -457,7 +457,8 @@ std::shared_ptr TIndexInfo::GetIndexMetaC } std::vector TIndexInfo::GetEntityIds() const { - auto result = GetColumnIds(true); + const auto columnIds = GetColumnIds(true); + std::vector result(columnIds.begin(), columnIds.end()); for (auto&& i : Indexes) { result.emplace_back(i.first); } @@ -498,10 +499,8 @@ TIndexInfo::TIndexInfo(const TIndexInfo& original, const TSchemaDiffView& diff, const ui32 originalColId = original.SchemaColumnIdsWithSpecials[index]; SchemaColumnIdsWithSpecials.emplace_back(originalColId); if (!IIndexInfo::IsSpecialColumn(originalColId)) { - AFL_VERIFY(index < original.SchemaColumnIds.size()); - SchemaColumnIds.emplace_back(originalColId); - ColumnNames.emplace_back(TNameInfo(original.ColumnFeatures[index]->GetColumnName(), originalColId, ColumnNames.size())); - fields.emplace_back(original.Schema->field(index)); + AFL_VERIFY(index < original.SchemaColumnIdsWithSpecials.size() - SpecialColumnsCount); + fields.emplace_back(original.SchemaWithSpecials->field(index)); } }; @@ -509,16 +508,12 @@ TIndexInfo::TIndexInfo(const TIndexInfo& original, const TSchemaDiffView& diff, const ui32 colId = col.GetId(); AFL_VERIFY(!IIndexInfo::IsSpecialColumn(colId)); SchemaColumnIdsWithSpecials.emplace_back(colId); - SchemaColumnIds.emplace_back(colId); - ColumnNames.emplace_back(TNameInfo(col.GetName(), colId, ColumnNames.size())); auto tableCol = BuildColumnFromProto(col, cache); fields.emplace_back(BuildArrowField(tableCol, cache)); }; diff.ApplyForColumns(original.SchemaColumnIdsWithSpecials, addFromOriginal, addFromDiff); - Schema = std::make_shared(fields); IIndexInfo::AddSpecialFields(fields); SchemaWithSpecials = std::make_shared(fields); - std::sort(ColumnNames.begin(), ColumnNames.end(), TNameInfo::TNameComparator()); PKColumnIds = original.PKColumnIds; PKColumns = original.PKColumns; } @@ -566,32 +561,46 @@ TIndexInfo::TIndexInfo(const TIndexInfo& original, const TSchemaDiffView& diff, } void TIndexInfo::Precalculate() { + BuildColumnIndexByName(); UsedStorageIds = std::make_shared>(); for (auto&& i : ColumnFeatures) { UsedStorageIds->emplace(i->GetOperator()->GetStorageId()); } } +void TIndexInfo::BuildColumnIndexByName() { + const ui32 columnCount = SchemaColumnIdsWithSpecials.size(); + std::erase_if(ColumnIdxSortedByName, [columnCount](const ui32 idx) { + return idx >= columnCount; + }); + ColumnIdxSortedByName.reserve(columnCount); + for (ui32 i = 0; i < columnCount; ++i) { + ColumnIdxSortedByName.push_back(i); + } + + std::sort(ColumnIdxSortedByName.begin(), ColumnIdxSortedByName.end(), [this](const ui32 lhs, const ui32 rhs) { + return CompareColumnIdxByName(lhs, rhs); + }); +} + void TIndexInfo::Validate() const { AFL_VERIFY(!!UsedStorageIds); AFL_VERIFY(ColumnFeatures.size() == SchemaColumnIdsWithSpecials.size()); AFL_VERIFY(ColumnFeatures.size() == (ui32)SchemaWithSpecials->num_fields()); - AFL_VERIFY(ColumnFeatures.size() == (ui32)Schema->num_fields() + IIndexInfo::SpecialColumnsCount); - AFL_VERIFY(ColumnFeatures.size() == SchemaColumnIds.size() + IIndexInfo::SpecialColumnsCount); { ui32 idx = 0; - for (auto&& i : SchemaColumnIds) { + for (auto&& i : SchemaColumnIdsWithSpecials) { AFL_VERIFY(i == ColumnFeatures[idx]->GetColumnId()); - AFL_VERIFY(Schema->field(idx)->name() == ColumnFeatures[idx]->GetColumnName()); - AFL_VERIFY(Schema->field(idx)->Equals(SchemaWithSpecials->field(idx))); + AFL_VERIFY(SchemaWithSpecials->field(idx)->name() == ColumnFeatures[idx]->GetColumnName()); ++idx; } } + AFL_VERIFY(std::is_sorted(SchemaColumnIdsWithSpecials.begin(), SchemaColumnIdsWithSpecials.end())); - for (auto&& i : ColumnNames) { - AFL_VERIFY(ColumnFeatures[i.GetColumnIdx()]->GetColumnId() == i.GetColumnId()); - AFL_VERIFY(ColumnFeatures[i.GetColumnIdx()]->GetColumnName() == i.GetName()); - } + AFL_VERIFY(ColumnFeatures.size() == ColumnIdxSortedByName.size()); + AFL_VERIFY(std::is_sorted(ColumnIdxSortedByName.begin(), ColumnIdxSortedByName.end(), [this](const ui32 lhs, const ui32 rhs) { + return CompareColumnIdxByName(lhs, rhs); + })); { ui32 pkIdx = 0; diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 9d27f0015971..420347ae7803 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -53,47 +54,7 @@ struct TIndexInfo: public IIndexInfo { friend class TPortionInfo; friend class TPortionDataAccessor; - class TNameInfo { - private: - YDB_READONLY_DEF(TString, Name); - YDB_READONLY(ui32, ColumnId, 0); - YDB_READONLY(ui32, ColumnIdx, 0); - - public: - struct TNameComparator { - bool operator()(const TNameInfo& l, const TNameInfo& r) const { - return l.Name < r.Name; - }; - }; - - struct TColumnIdComparator { - bool operator()(const TNameInfo& l, const TNameInfo& r) const { - return l.ColumnId < r.ColumnId; - }; - }; - - TNameInfo(const TString& name, const ui32 columnId, const ui32 columnIdx) - : Name(name) - , ColumnId(columnId) - , ColumnIdx(columnIdx) { - } - - static std::vector BuildColumnNames(const TColumns& columns) { - std::vector result; - for (auto&& i : columns) { - result.emplace_back(TNameInfo(i.second.Name, i.first, 0)); - } - std::sort(result.begin(), result.end(), TNameInfo::TColumnIdComparator()); - ui32 idx = 0; - for (auto&& i : result) { - i.ColumnIdx = idx++; - } - std::sort(result.begin(), result.end(), TNameInfo::TNameComparator()); - return result; - } - }; - - std::vector ColumnNames; + std::vector ColumnIdxSortedByName; std::vector PKColumnIds; std::vector PKColumns; @@ -107,10 +68,8 @@ struct TIndexInfo: public IIndexInfo { std::optional ScanReaderPolicyName; ui64 Version = 0; - std::vector SchemaColumnIds; std::vector SchemaColumnIdsWithSpecials; std::shared_ptr SchemaWithSpecials; - std::shared_ptr Schema; std::shared_ptr PrimaryKey; NArrow::NSerialization::TSerializerContainer DefaultSerializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); @@ -153,6 +112,7 @@ struct TIndexInfo: public IIndexInfo { void Validate() const; void Precalculate(); + void BuildColumnIndexByName(); bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema, const std::shared_ptr& operators, const std::shared_ptr& cache); @@ -187,6 +147,12 @@ struct TIndexInfo: public IIndexInfo { void SetAllKeys(const std::shared_ptr& operators, const THashMap& columns); + bool CompareColumnIdxByName(const ui32 lhs, const ui32 rhs) const { + AFL_VERIFY(lhs < ColumnFeatures.size()); + AFL_VERIFY(rhs < ColumnFeatures.size()); + return ColumnFeatures[lhs]->GetColumnName() < ColumnFeatures[rhs]->GetColumnName(); + } + public: NSplitter::TEntityGroups GetEntityGroupsByStorageId(const TString& specialTier, const IStoragesManager& storages) const; std::optional GetPKColumnIndexByIndexVerified(const ui32 columnIndex) const { @@ -260,15 +226,11 @@ struct TIndexInfo: public IIndexInfo { static TIndexInfo BuildDefault(); - static TIndexInfo BuildDefault( - const std::shared_ptr& operators, const TColumns& columns, const std::vector& pkNames) { + static TIndexInfo BuildDefault(const std::shared_ptr& operators, const TColumns& columns, const std::vector& pkIds) { TIndexInfo result = BuildDefault(); - result.ColumnNames = TNameInfo::BuildColumnNames(columns); - for (auto&& i : pkNames) { - const ui32 columnId = result.GetColumnIdVerified(i); - result.PKColumnIds.emplace_back(columnId); - } + result.PKColumnIds = pkIds; result.SetAllKeys(operators, columns); + result.Validate(); return result; } @@ -392,8 +354,8 @@ struct TIndexInfo: public IIndexInfo { /// Returns names of columns defined by the specific ids. std::vector GetColumnNames(const std::vector& ids) const; - std::vector GetColumnSTLNames(const std::vector& ids) const; - const std::vector& GetColumnIds(const bool withSpecial = true) const; + std::vector GetColumnSTLNames(const bool withSpecial = true) const; + TColumnIdsView GetColumnIds(const bool withSpecial = true) const; ui32 GetColumnIdByIndexVerified(const ui32 index) const { AFL_VERIFY(index < SchemaColumnIdsWithSpecials.size()); return SchemaColumnIdsWithSpecials[index]; @@ -424,7 +386,7 @@ struct TIndexInfo: public IIndexInfo { std::vector GetColumnIds(const std::vector& columnNames) const; - const std::shared_ptr& ArrowSchema() const; + NArrow::TSchemaLiteView ArrowSchema() const; const std::shared_ptr& ArrowSchemaWithSpecials() const; bool AllowTtlOverColumn(const TString& name) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp index 3ca7c1ec0cbe..6ac8fc0891a4 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp @@ -83,7 +83,7 @@ TConclusion> ISnapshotSchema::PrepareForModi NArrow::TStatusValidator::Validate(incomingBatch->ValidateFull()); #endif - const std::shared_ptr dstSchema = GetIndexInfo().ArrowSchema(); + NArrow::TSchemaLiteView dstSchema = GetIndexInfo().ArrowSchema(); std::vector> pkColumns; pkColumns.resize(GetIndexInfo().GetReplaceKey()->num_fields()); ui32 pkColumnsCount = 0; @@ -102,7 +102,7 @@ TConclusion> ISnapshotSchema::PrepareForModi return TConclusionStatus::Success(); } if (pkFieldIdx) { - return TConclusionStatus::Fail("null data for pk column is impossible for '" + dstSchema->field(targetIdx)->name() + "'"); + return TConclusionStatus::Fail("null data for pk column is impossible for '" + dstSchema.field(targetIdx)->name() + "'"); } switch (mType) { case NEvWrite::EModificationType::Replace: @@ -114,7 +114,7 @@ TConclusion> ISnapshotSchema::PrepareForModi if (GetIndexInfo().GetColumnExternalDefaultValueByIndexVerified(targetIdx)) { return TConclusionStatus::Success(); } else { - return TConclusionStatus::Fail("empty field for non-default column: '" + dstSchema->field(targetIdx)->name() + "'"); + return TConclusionStatus::Fail("empty field for non-default column: '" + dstSchema.field(targetIdx)->name() + "'"); } } case NEvWrite::EModificationType::Delete: @@ -200,7 +200,7 @@ std::vector ISnapshotSchema::GetPKColumnNames() const { std::vector> ISnapshotSchema::GetAbsentFields(const std::shared_ptr& existsSchema) const { std::vector> result; - for (auto&& f : GetIndexInfo().ArrowSchema()->fields()) { + for (auto&& f : GetIndexInfo().ArrowSchema()) { if (!existsSchema->GetFieldByName(f->name())) { result.emplace_back(f); } @@ -220,9 +220,9 @@ TConclusionStatus ISnapshotSchema::CheckColumnsDefault(const std::vector> ISnapshotSchema::BuildDefaultBatch( - const std::vector>& fields, const ui32 rowsCount, const bool force) const { + const NArrow::TSchemaLiteView& schema, const ui32 rowsCount, const bool force) const { std::vector> columns; - for (auto&& i : fields) { + for (auto&& i : schema) { const ui32 columnId = GetColumnIdVerified(i->name()); auto defaultValue = GetExternalDefaultValueVerified(columnId); if (!defaultValue && !GetIndexInfo().IsNullableVerified(columnId)) { @@ -234,7 +234,7 @@ TConclusion> ISnapshotSchema::BuildDefaultBa } columns.emplace_back(NArrow::TThreadSimpleArraysCache::Get(i->type(), defaultValue, rowsCount)); } - return arrow::RecordBatch::Make(std::make_shared(fields), rowsCount, columns); + return arrow::RecordBatch::Make(std::make_shared(arrow::FieldVector(schema.begin(), schema.end())), rowsCount, columns); } std::shared_ptr ISnapshotSchema::GetExternalDefaultValueVerified(const std::string& columnName) const { @@ -288,8 +288,8 @@ TConclusion ISnapshotSchema::PrepareForWrite(c AFL_VERIFY(incomingBatch->num_rows()); auto itIncoming = incomingBatch->schema()->fields().begin(); auto itIncomingEnd = incomingBatch->schema()->fields().end(); - auto itIndex = GetIndexInfo().ArrowSchema()->fields().begin(); - auto itIndexEnd = GetIndexInfo().ArrowSchema()->fields().end(); + auto itIndex = GetIndexInfo().ArrowSchema().begin(); + auto itIndexEnd = GetIndexInfo().ArrowSchema().end(); THashMap>> chunks; std::shared_ptr schemaDetails( @@ -298,7 +298,7 @@ TConclusion ISnapshotSchema::PrepareForWrite(c while (itIncoming != itIncomingEnd && itIndex != itIndexEnd) { if ((*itIncoming)->name() == (*itIndex)->name()) { const ui32 incomingIndex = itIncoming - incomingBatch->schema()->fields().begin(); - const ui32 columnIndex = itIndex - GetIndexInfo().ArrowSchema()->fields().begin(); + const ui32 columnIndex = itIndex - GetIndexInfo().ArrowSchema().begin(); const ui32 columnId = GetIndexInfo().GetColumnIdByIndexVerified(columnIndex); auto loader = GetIndexInfo().GetColumnLoaderVerified(columnId); auto saver = GetIndexInfo().GetColumnSaver(columnId); diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h index a914ae1ab51b..825f3f7e543b 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -29,7 +30,7 @@ class ISnapshotSchema { std::shared_ptr GetColumnLoaderVerified(const std::string& columnName) const; bool IsSpecialColumnId(const ui32 columnId) const; - virtual const std::vector& GetColumnIds() const = 0; + virtual TColumnIdsView GetColumnIds() const = 0; virtual NArrow::NAccessor::TColumnSaver GetColumnSaver(const ui32 columnId) const = 0; NArrow::NAccessor::TColumnSaver GetColumnSaver(const TString& columnName) const { @@ -45,7 +46,7 @@ class ISnapshotSchema { std::shared_ptr GetExternalDefaultValueVerified(const ui32 columnId) const; TConclusion> BuildDefaultBatch( - const std::vector>& fields, const ui32 rowsCount, const bool force) const; + const NArrow::TSchemaLiteView& schema, const ui32 rowsCount, const bool force) const; TConclusionStatus CheckColumnsDefault(const std::vector>& fields) const; std::vector GetPKColumnNames() const; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h index 7a3112a12c57..417fcaa0775f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h @@ -18,8 +18,8 @@ class TFilteredSnapshotSchema: public ISnapshotSchema { TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& originalSnapshot, const std::vector& columnIds); TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& originalSnapshot, const std::set& columnIds); - virtual const std::vector& GetColumnIds() const override { - return ColumnIds; + virtual TColumnIdsView GetColumnIds() const override { + return {ColumnIds.begin(), ColumnIds.end()}; } TColumnSaver GetColumnSaver(const ui32 columnId) const override; std::shared_ptr GetColumnLoaderOptional(const ui32 columnId) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h index 9965743257b3..5246d3926750 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h @@ -23,7 +23,7 @@ class TSnapshotSchema: public ISnapshotSchema { public: TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot); - virtual const std::vector& GetColumnIds() const override { + virtual TColumnIdsView GetColumnIds() const override { return IndexInfo.GetColumnIds(); } diff --git a/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp b/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp index e76affbf5549..dca1b7b52023 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp @@ -48,7 +48,7 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr& /*t } else { auto insertionConclusion = Context.GetActualSchema()->CheckColumnsDefault(defaultFields); auto conclusion = - Context.GetActualSchema()->BuildDefaultBatch(Context.GetActualSchema()->GetIndexInfo().ArrowSchema()->fields(), 1, true); + Context.GetActualSchema()->BuildDefaultBatch(Context.GetActualSchema()->GetIndexInfo().ArrowSchema(), 1, true); AFL_VERIFY(!conclusion.IsFail())("error", conclusion.GetErrorMessage()); auto batchDefault = conclusion.DetachResult(); NArrow::NMerger::TSortableBatchPosition pos( diff --git a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp index 823f6ac1cf3d..829f1cbe36a2 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp @@ -18,7 +18,7 @@ NKikimr::TConclusionStatus IMerger::Finish() { NKikimr::TConclusionStatus IMerger::AddExistsDataOrdered(const std::shared_ptr& data) { AFL_VERIFY(data); NArrow::NMerger::TRWSortableBatchPosition existsPosition(data, 0, Schema->GetPKColumnNames(), - Schema->GetIndexInfo().GetColumnSTLNames(Schema->GetIndexInfo().GetColumnIds(false)), false); + Schema->GetIndexInfo().GetColumnSTLNames(false), false); bool exsistFinished = !existsPosition.InitPosition(0); while (!IncomingFinished && !exsistFinished) { auto cmpResult = IncomingPosition.Compare(existsPosition); @@ -47,7 +47,7 @@ NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSo auto rGuard = Builder.StartRecord(); AFL_VERIFY(Schema->GetIndexInfo().GetColumnIds(false).size() == exists.GetData().GetColumns().size()) ("index", Schema->GetIndexInfo().GetColumnIds(false).size())("exists", exists.GetData().GetColumns().size()); - for (i32 columnIdx = 0; columnIdx < Schema->GetIndexInfo().ArrowSchema()->num_fields(); ++columnIdx) { + for (i32 columnIdx = 0; columnIdx < Schema->GetIndexInfo().ArrowSchema().num_fields(); ++columnIdx) { const std::optional& incomingColumnIdx = IncomingColumnRemap[columnIdx]; if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) { const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition()); @@ -56,18 +56,17 @@ NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSo const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition()); rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk); } - } + } return TConclusionStatus::Success(); } TUpdateMerger::TUpdateMerger(const std::shared_ptr& incoming, const std::shared_ptr& actualSchema, const TString& insertDenyReason, const std::optional& defaultExists /*= {}*/) : TBase(incoming, actualSchema) - , Builder(actualSchema->GetIndexInfo().ArrowSchema()->fields()) + , Builder({ actualSchema->GetIndexInfo().ArrowSchema().begin(), actualSchema->GetIndexInfo().ArrowSchema().end() }) , DefaultExists(defaultExists) - , InsertDenyReason(insertDenyReason) -{ - for (auto&& f : actualSchema->GetIndexInfo().ArrowSchema()->fields()) { + , InsertDenyReason(insertDenyReason) { + for (auto&& f : actualSchema->GetIndexInfo().ArrowSchema()) { auto fIdx = IncomingData->schema()->GetFieldIndex(f->name()); if (fIdx == -1) { IncomingColumnRemap.emplace_back(); @@ -86,5 +85,4 @@ TUpdateMerger::TUpdateMerger(const std::shared_ptr& incoming } } } - } diff --git a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp index 84def7cbdc04..5990cc70d930 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp @@ -155,7 +155,7 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr& /*ta const auto& indexSchema = Context.GetActualSchema()->GetIndexInfo().ArrowSchema(); auto subsetConclusion = NArrow::TColumnOperator().IgnoreOnDifferentFieldTypes().BuildSequentialSubset(OriginalBatch, indexSchema); if (subsetConclusion.IsFail()) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema->ToString())( + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema.ToString())( "problem", subsetConclusion.GetErrorMessage()); ReplyError("unadaptable schema: " + subsetConclusion.GetErrorMessage(), NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal); @@ -163,13 +163,13 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr& /*ta } NArrow::TSchemaSubset subset = subsetConclusion.DetachResult(); - if (OriginalBatch->num_columns() != indexSchema->num_fields()) { - AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())( - "index", indexSchema->num_fields()); + if (OriginalBatch->num_columns() != indexSchema.num_fields()) { + AFL_VERIFY(OriginalBatch->num_columns() < indexSchema.num_fields())("original", OriginalBatch->num_columns())( + "index", indexSchema.num_fields()); if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard() && WriteData.GetWriteMeta().GetModificationType() != NEvWrite::EModificationType::Delete) { subset = NArrow::TSchemaSubset::AllFieldsAccepted(); - const std::vector& columnIdsVector = Context.GetActualSchema()->GetIndexInfo().GetColumnIds(false); + const auto columnIdsVector = Context.GetActualSchema()->GetIndexInfo().GetColumnIds(false); const std::set columnIdsSet(columnIdsVector.begin(), columnIdsVector.end()); auto normalized = Context.GetActualSchema() diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp index a55f13241b65..ae6f4bbb0ca3 100644 --- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp @@ -428,25 +428,26 @@ namespace NKikimr::NColumnShard { NOlap::TIndexInfo BuildTableInfo(const std::vector& ydbSchema, const std::vector& key) { THashMap columns; - THashMap columnByName; + THashMap columnIdByName; for (ui32 i = 0; i < ydbSchema.size(); ++i) { ui32 id = i + 1; auto& name = ydbSchema[i].GetName(); auto& type = ydbSchema[i].GetType(); columns[id] = NTable::TColumn(name, id, type, ""); - AFL_VERIFY(columnByName.emplace(name, &columns[id]).second); + AFL_VERIFY(columnIdByName.emplace(name, id).second); } - std::vector pkNames; + std::vector pkIds; ui32 idx = 0; for (const auto& c : key) { - auto it = columnByName.find(c.GetName()); - AFL_VERIFY(it != columnByName.end()); - it->second->KeyOrder = idx++; - pkNames.push_back(c.GetName()); + auto it = columnIdByName.FindPtr(c.GetName()); + AFL_VERIFY(it); + AFL_VERIFY(*it < columns.size()); + columns[*it].KeyOrder = idx++; + pkIds.push_back(*it); } - return NOlap::TIndexInfo::BuildDefault(NOlap::TTestStoragesManager::GetInstance(), columns, pkNames); + return NOlap::TIndexInfo::BuildDefault(NOlap::TTestStoragesManager::GetInstance(), columns, pkIds); } void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, const NOlap::TSnapshot& snapshot, bool succeed) { diff --git a/ydb/library/formats/arrow/common/iterator.cpp b/ydb/library/formats/arrow/common/iterator.cpp new file mode 100644 index 000000000000..d2ae67542269 --- /dev/null +++ b/ydb/library/formats/arrow/common/iterator.cpp @@ -0,0 +1,3 @@ +#include "iterator.h" + +namespace NKikimr::NArrow::NUtil {} diff --git a/ydb/library/formats/arrow/common/iterator.h b/ydb/library/formats/arrow/common/iterator.h new file mode 100644 index 000000000000..9fee8c3441cc --- /dev/null +++ b/ydb/library/formats/arrow/common/iterator.h @@ -0,0 +1,84 @@ +#pragma once + +namespace NKikimr::NArrow::NUtil { + +template +class TRandomAccessIteratorClone { +private: + TBase Base; + +public: + using iterator_category = TBase::iterator_category; + using difference_type = TBase::difference_type; + using value_type = TBase::value_type; + using pointer = TBase::pointer; + using reference = TBase::reference; + + TRandomAccessIteratorClone() = default; + TRandomAccessIteratorClone(const TBase& base) + : Base(base) { + } + + bool operator==(const TDerived& other) const { + return Base == other.Base; + } + bool operator!=(const TDerived& other) const { + return Base != other.Base; + } + + TDerived& operator+=(const difference_type& diff) { + Base += diff; + return *static_cast(this); + } + TDerived& operator-=(const difference_type& diff) { + Base -= diff; + return *static_cast(this); + } + TDerived& operator++() { + ++Base; + return *static_cast(this); + } + TDerived& operator--() { + --Base; + return *static_cast(this); + } + TDerived operator++(int) { + auto ret = *static_cast(this); + ++Base; + return ret; + } + TDerived operator--(int) { + auto ret = *static_cast(this); + --Base; + return ret; + } + TDerived operator+(const difference_type& diff) { + return Base + diff; + } + TDerived operator-(const difference_type& diff) { + return Base - diff; + } + + difference_type operator-(const TDerived& other) { + return Base - other.Base; + } + + reference operator*() { + return *Base; + } + const reference operator*() const { + return *Base; + } + pointer operator->() { + return &*Base; + } + + pointer getPtr() const { + return Base.getPtr(); + } + const pointer getConstPtr() const { + return Base.getConstPtr(); + } +}; + +} // namespace NKikimr::NArrow::NUtil diff --git a/ydb/library/formats/arrow/modifier/schema.h b/ydb/library/formats/arrow/modifier/schema.h index 1d90167c0979..7dc06a40626e 100644 --- a/ydb/library/formats/arrow/modifier/schema.h +++ b/ydb/library/formats/arrow/modifier/schema.h @@ -1,7 +1,9 @@ #pragma once #include +#include #include #include +#include #include #include @@ -14,9 +16,16 @@ class TSchemaLite { public: TSchemaLite() = default; - TSchemaLite(const std::shared_ptr& schema) { - AFL_VERIFY(schema); - Fields = schema->fields(); + TSchemaLite(const std::shared_ptr& schema) + : Fields(TValidator::CheckNotNull(schema)->fields()) { + } + + TSchemaLite(std::vector>&& fields) + : Fields(std::move(fields)) { + } + + TSchemaLite(const std::vector>& fields) + : Fields(fields) { } const std::shared_ptr& field(const ui32 index) const { @@ -78,14 +87,80 @@ class TSchemaLite { } return Default>(); } +}; - TSchemaLite(std::vector>&& fields) - : Fields(std::move(fields)) { +class TSchemaLiteView: private TNonCopyable { +private: + using TFields = std::span>; + TFields Fields; + + class TIterator: public NUtil::TRandomAccessIteratorClone { + using TBase = NUtil::TRandomAccessIteratorClone; + public: + using TBase::TRandomAccessIteratorClone; + }; + +public: + TSchemaLiteView() = default; + TSchemaLiteView(const TSchemaLite& schema) + : Fields(schema.fields()) { } - TSchemaLite(const std::vector>& fields) + TSchemaLiteView(const std::span>& fields) : Fields(fields) { } + + std::shared_ptr field(const ui32 index) const { + return GetFieldByIndexVerified(index); + } + + TIterator begin() const { + return Fields.begin(); + } + + TIterator end() const { + return Fields.end(); + } + + int num_fields() const { + return Fields.size(); + } + + std::vector field_names() const { + std::vector result; + result.reserve(Fields.size()); + for (auto&& f : Fields) { + result.emplace_back(f->name()); + } + return result; + } + + TString DebugString() const { + TStringBuilder sb; + sb << "["; + for (auto&& f : Fields) { + sb << f->ToString() << ";"; + } + sb << "]"; + + return sb; + } + + TString ToString() const { + return DebugString(); + } + + const std::shared_ptr& GetFieldByIndexVerified(const ui32 index) const { + AFL_VERIFY(index < Fields.size()); + return Fields[index]; + } + + const std::shared_ptr& GetFieldByIndexOptional(const ui32 index) const { + if (index < Fields.size()) { + return Fields[index]; + } + return Default>(); + } }; } // namespace NKikimr::NArrow diff --git a/ydb/library/formats/arrow/modifier/subset.h b/ydb/library/formats/arrow/modifier/subset.h index ddc01e9ad803..49ef50500fcf 100644 --- a/ydb/library/formats/arrow/modifier/subset.h +++ b/ydb/library/formats/arrow/modifier/subset.h @@ -1,7 +1,9 @@ #pragma once -#include -#include #include +#include +#include + +#include namespace NKikimr::NArrow { @@ -24,23 +26,25 @@ class TSchemaSubset { return result; } - template - std::vector Apply(const std::vector& fullSchema) const { + template + std::vector::value_type> Apply(TIterator begin, TIterator end) const { + using TValue = std::iterator_traits::value_type; if (FieldIdx.empty()) { - return fullSchema; + return {std::move(begin), std::move(end)}; } - std::vector fields; + std::vector fields; + const ui64 size = end - begin; if (!Exclude) { for (auto&& i : FieldIdx) { - AFL_VERIFY(i < fullSchema.size()); - fields.emplace_back(fullSchema[i]); + AFL_VERIFY(i < size); + fields.emplace_back(*(begin + i)); } } else { auto it = FieldIdx.begin(); - for (ui32 i = 0; i < fullSchema.size(); ++i) { + for (ui32 i = 0; i < size; ++i) { if (it == FieldIdx.end() || i < *it) { - AFL_VERIFY(i < fullSchema.size()); - fields.emplace_back(fullSchema[i]); + AFL_VERIFY(i < size); + fields.emplace_back(*(begin + i)); } else if (i == *it) { ++it; } else {