From 2b66a35f18b66d3a1bd297a89f808ffd6d142062 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 19 Jul 2024 08:54:09 +0300 Subject: [PATCH] Refactoring use general container (#6836) --- ydb/core/formats/arrow/common/accessor.cpp | 13 ++ ydb/core/formats/arrow/common/accessor.h | 8 + ydb/core/formats/arrow/common/adapter.h | 2 +- ydb/core/formats/arrow/common/container.cpp | 140 +++++++++++++++--- ydb/core/formats/arrow/common/container.h | 63 +++++--- ydb/core/formats/arrow/modifier/schema.cpp | 69 +++++++++ ydb/core/formats/arrow/modifier/schema.h | 55 +++++++ ydb/core/formats/arrow/modifier/ya.make | 14 ++ ydb/core/formats/arrow/program.cpp | 2 +- ydb/core/formats/arrow/ya.make | 1 + .../engines/changes/general_compaction.cpp | 31 ++-- .../engines/changes/general_compaction.h | 2 +- .../engines/changes/indexation.cpp | 45 +++--- .../columnshard/engines/changes/indexation.h | 3 +- .../engines/portions/portion_info.cpp | 31 +++- .../engines/portions/portion_info.h | 5 + .../engines/portions/read_with_blobs.cpp | 36 ++--- .../engines/portions/read_with_blobs.h | 3 +- .../plain_reader/iterator/fetched_data.h | 2 +- .../reader/plain_reader/iterator/fetching.cpp | 8 +- .../reader/plain_reader/iterator/merge.cpp | 2 +- .../reader/plain_reader/iterator/source.cpp | 9 +- .../engines/scheme/abstract/index_info.cpp | 35 ++--- .../engines/scheme/abstract/index_info.h | 7 +- .../scheme/versions/abstract_scheme.cpp | 27 ++-- .../engines/scheme/versions/abstract_scheme.h | 5 +- 26 files changed, 455 insertions(+), 163 deletions(-) create mode 100644 ydb/core/formats/arrow/modifier/schema.cpp create mode 100644 ydb/core/formats/arrow/modifier/schema.h create mode 100644 ydb/core/formats/arrow/modifier/ya.make diff --git a/ydb/core/formats/arrow/common/accessor.cpp b/ydb/core/formats/arrow/common/accessor.cpp index 9865b2a692f7..775cffa95bab 100644 --- a/ydb/core/formats/arrow/common/accessor.cpp +++ b/ydb/core/formats/arrow/common/accessor.cpp @@ -1,4 +1,5 @@ #include "accessor.h" +#include #include #include #include @@ -94,6 +95,10 @@ class TChunkAccessor { } +std::optional TTrivialArray::DoGetRawSize() const { + return NArrow::GetArrayDataSize(Array); +} + std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const { AFL_VERIFY(StartPosition <= position); AFL_VERIFY(position < FinishPosition); @@ -119,4 +124,12 @@ IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std:: return SelectChunk(chunkCurrent, position, accessor); } +std::optional TTrivialChunkedArray::DoGetRawSize() const { + ui64 result = 0; + for (auto&& i : Array->chunks()) { + result += NArrow::GetArrayDataSize(i); + } + return result; +} + } diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h index 3765d726992b..702f13fcc6f4 100644 --- a/ydb/core/formats/arrow/common/accessor.h +++ b/ydb/core/formats/arrow/common/accessor.h @@ -84,6 +84,7 @@ class IChunkedArray { YDB_READONLY_DEF(std::shared_ptr, DataType); YDB_READONLY(ui64, RecordsCount, 0); YDB_READONLY(EType, Type, EType::Undefined); + virtual std::optional DoGetRawSize() const = 0; protected: virtual std::shared_ptr DoGetChunkedArray() const = 0; virtual TCurrentChunkAddress DoGetChunk(const std::optional& chunkCurrent, const ui64 position) const = 0; @@ -156,6 +157,10 @@ class IChunkedArray { TString DebugString(const ui32 position) const; }; + std::optional GetRawSize() const { + return DoGetRawSize(); + } + std::shared_ptr GetChunkedArray() const { return DoGetChunkedArray(); } @@ -180,6 +185,8 @@ class TTrivialArray: public IChunkedArray { using TBase = IChunkedArray; const std::shared_ptr Array; protected: + virtual std::optional DoGetRawSize() const override; + virtual TCurrentChunkAddress DoGetChunk(const std::optional& /*chunkCurrent*/, const ui64 /*position*/) const override { return TCurrentChunkAddress(Array, 0, 0); } @@ -204,6 +211,7 @@ class TTrivialChunkedArray: public IChunkedArray { virtual std::shared_ptr DoGetChunkedArray() const override { return Array; } + virtual std::optional DoGetRawSize() const override; public: TTrivialChunkedArray(const std::shared_ptr& data) diff --git a/ydb/core/formats/arrow/common/adapter.h b/ydb/core/formats/arrow/common/adapter.h index 543e78511146..1b368e38de50 100644 --- a/ydb/core/formats/arrow/common/adapter.h +++ b/ydb/core/formats/arrow/common/adapter.h @@ -91,7 +91,7 @@ class TDataBuilderPolicy { return batch; } [[nodiscard]] static std::shared_ptr ApplyArrowFilter(const std::shared_ptr& batch, const std::shared_ptr& filter) { - auto table = batch->BuildTable(); + auto table = batch->BuildTableVerified(); return std::make_shared(TDataBuilderPolicy::ApplyArrowFilter(table, filter)); } [[nodiscard]] static std::shared_ptr GetEmptySame(const std::shared_ptr& batch) { diff --git a/ydb/core/formats/arrow/common/container.cpp b/ydb/core/formats/arrow/common/container.cpp index ccf8dc71fb0c..ad0215737bc8 100644 --- a/ydb/core/formats/arrow/common/container.cpp +++ b/ydb/core/formats/arrow/common/container.cpp @@ -1,50 +1,60 @@ #include "container.h" #include +#include #include namespace NKikimr::NArrow { -NKikimr::TConclusionStatus TGeneralContainer::MergeColumnsStrictly(const TGeneralContainer& container) { - if (RecordsCount != container.RecordsCount) { +TConclusionStatus TGeneralContainer::MergeColumnsStrictly(const TGeneralContainer& container) { + if (!container.RecordsCount) { + return TConclusionStatus::Success(); + } + if (!RecordsCount) { + RecordsCount = container.RecordsCount; + } + if (*RecordsCount != *container.RecordsCount) { return TConclusionStatus::Fail(TStringBuilder() << "inconsistency records count in additional container: " << container.GetSchema()->ToString() << ". expected: " << RecordsCount << ", reality: " << container.GetRecordsCount()); } for (i32 i = 0; i < container.Schema->num_fields(); ++i) { auto addFieldResult = AddField(container.Schema->field(i), container.Columns[i]); - if (!addFieldResult) { + if (addFieldResult.IsFail()) { return addFieldResult; } } return TConclusionStatus::Success(); } -NKikimr::TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr& f, const std::shared_ptr& data) { +TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr& f, const std::shared_ptr& data) { AFL_VERIFY(f); AFL_VERIFY(data); - if (data->GetRecordsCount() != RecordsCount) { + if (RecordsCount && data->GetRecordsCount() != *RecordsCount) { return TConclusionStatus::Fail(TStringBuilder() << "inconsistency records count in new column: " << f->name() << ". expected: " << RecordsCount << ", reality: " << data->GetRecordsCount()); } if (!data->GetDataType()->Equals(f->type())) { return TConclusionStatus::Fail("schema and data type are not equals: " + data->GetDataType()->ToString() + " vs " + f->type()->ToString()); } - if (Schema->GetFieldByName(f->name())) { - return TConclusionStatus::Fail("field name duplication: " + f->name()); - } - auto resultAdd = Schema->AddField(Schema->num_fields(), f); - if (!resultAdd.ok()) { - return TConclusionStatus::Fail("internal schema error on add field: " + resultAdd.status().ToString()); + { + auto conclusion = Schema->AddField(f); + if (conclusion.IsFail()) { + return conclusion; + } } - Schema = *resultAdd; + RecordsCount = data->GetRecordsCount(); Columns.emplace_back(data); return TConclusionStatus::Success(); } -TGeneralContainer::TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns) - : Schema(schema) - , Columns(std::move(columns)) -{ - AFL_VERIFY(schema); +TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr& f, const std::shared_ptr& data) { + return AddField(f, std::make_shared(data)); +} + +TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr& f, const std::shared_ptr& data) { + return AddField(f, std::make_shared(data)); +} + +void TGeneralContainer::Initialize() { std::optional recordsCount; AFL_VERIFY(Schema->num_fields() == (i32)Columns.size())("schema", Schema->num_fields())("columns", Columns.size()); for (i32 i = 0; i < Schema->num_fields(); ++i) { @@ -58,12 +68,34 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr& schem } } AFL_VERIFY(recordsCount); + AFL_VERIFY(!RecordsCount || *RecordsCount == *recordsCount); RecordsCount = *recordsCount; } +TGeneralContainer::TGeneralContainer(const std::vector>& fields, std::vector>&& columns) + : Schema(std::make_shared(fields)) + , Columns(std::move(columns)) +{ + Initialize(); +} + +TGeneralContainer::TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns) + : Schema(std::make_shared(schema)) + , Columns(std::move(columns)) +{ + Initialize(); +} + +TGeneralContainer::TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns) + : Schema(std::make_shared(schema)) + , Columns(std::move(columns)) +{ + Initialize(); +} + TGeneralContainer::TGeneralContainer(const std::shared_ptr& table) { AFL_VERIFY(table); - Schema = table->schema(); + Schema = std::make_shared(table->schema()); RecordsCount = table->num_rows(); for (auto&& i : table->columns()) { if (i->num_chunks() == 1) { @@ -72,15 +104,17 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr& table) Columns.emplace_back(std::make_shared(i)); } } + Initialize(); } TGeneralContainer::TGeneralContainer(const std::shared_ptr& table) { AFL_VERIFY(table); - Schema = table->schema(); + Schema = std::make_shared(table->schema()); RecordsCount = table->num_rows(); for (auto&& i : table->columns()) { Columns.emplace_back(std::make_shared(i)); } + Initialize(); } std::shared_ptr TGeneralContainer::GetAccessorByNameVerified(const std::string& fieldId) const { @@ -110,14 +144,78 @@ std::shared_ptr TGeneralContainer::BuildTableOptional(const std::o if (fields.empty()) { return nullptr; } - return arrow::Table::Make(std::make_shared(fields), columns, RecordsCount); + AFL_VERIFY(RecordsCount); + return arrow::Table::Make(std::make_shared(fields), columns, *RecordsCount); } -std::shared_ptr TGeneralContainer::BuildTable(const std::optional>& columnNames /*= {}*/) const { +std::shared_ptr TGeneralContainer::BuildTableVerified(const std::optional>& columnNames /*= {}*/) const { auto result = BuildTableOptional(columnNames); AFL_VERIFY(result); AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size()); return result; } +std::shared_ptr TGeneralContainer::GetAccessorByNameOptional(const std::string& fieldId) const { + int idx = Schema->GetFieldIndex(fieldId); + if (idx == -1) { + return nullptr; + } + AFL_VERIFY((ui32)idx < Columns.size())("idx", idx)("count", Columns.size()); + return Columns[idx]; +} + +TConclusionStatus TGeneralContainer::SyncSchemaTo(const std::shared_ptr& schema, const IFieldsConstructor* defaultFieldsConstructor, const bool forceDefaults) { + std::shared_ptr schemaNew = std::make_shared(); + std::vector> columnsNew; + if (!RecordsCount) { + return TConclusionStatus::Fail("original container has not data"); + } + for (auto&& i : schema->fields()) { + const int idx = Schema->GetFieldIndex(i->name()); + if (idx == -1) { + if (!defaultFieldsConstructor) { + return TConclusionStatus::Fail("haven't field for sync: '" + i->name() + "'"); + } else { + schemaNew->AddField(i).Validate(); + auto defConclusion = defaultFieldsConstructor->GetDefaultColumnElementValue(i, forceDefaults); + if (defConclusion.IsFail()) { + return defConclusion; + } + columnsNew.emplace_back(std::make_shared(NArrow::TThreadSimpleArraysCache::Get(i->type(), *defConclusion, *RecordsCount))); + } + } else { + const auto& fOwned = Schema->GetFieldVerified(idx); + if (!fOwned->type()->Equals(i->type())) { + return TConclusionStatus::Fail("different field types for '" + i->name() + "'. Have " + fOwned->type()->ToString() + ", need " + i->type()->ToString()); + } + schemaNew->AddField(fOwned).Validate(); + columnsNew.emplace_back(Columns[idx]); + } + } + std::swap(Schema, schemaNew); + std::swap(columnsNew, Columns); + return TConclusionStatus::Success(); +} + +TString TGeneralContainer::DebugString() const { + TStringBuilder result; + if (RecordsCount) { + result << "records_count=" << *RecordsCount << ";"; + } + result << "schema=" << Schema->ToString() << ";"; + return result; +} + +TConclusion> IFieldsConstructor::GetDefaultColumnElementValue(const std::shared_ptr& field, const bool force) const { + AFL_VERIFY(field); + auto result = DoGetDefaultColumnElementValue(field->name()); + if (result) { + return result; + } + if (force) { + return NArrow::DefaultScalar(field->type()); + } + return TConclusionStatus::Fail("have not default value for column " + field->name()); +} + } diff --git a/ydb/core/formats/arrow/common/container.h b/ydb/core/formats/arrow/common/container.h index 25262d14ff4a..b92871b96c6a 100644 --- a/ydb/core/formats/arrow/common/container.h +++ b/ydb/core/formats/arrow/common/container.h @@ -1,7 +1,10 @@ #pragma once #include "accessor.h" +#include + #include +#include #include #include @@ -12,50 +15,70 @@ namespace NKikimr::NArrow { +class IFieldsConstructor { +private: + virtual std::shared_ptr DoGetDefaultColumnElementValue(const std::string& fieldName) const = 0; +public: + TConclusion> GetDefaultColumnElementValue(const std::shared_ptr& field, const bool force) const; +}; + class TGeneralContainer { private: - YDB_READONLY(ui64, RecordsCount, 0); - YDB_READONLY_DEF(std::shared_ptr, Schema); + YDB_READONLY_DEF(std::optional, RecordsCount); + YDB_READONLY_DEF(std::shared_ptr, Schema); std::vector> Columns; + void Initialize(); public: - TString DebugString() const { - return TStringBuilder() - << "records_count=" << RecordsCount << ";" - << "schema=" << Schema->ToString() << ";" - ; + TString DebugString() const; + + [[nodiscard]] TConclusionStatus SyncSchemaTo(const std::shared_ptr& schema, + const IFieldsConstructor* defaultFieldsConstructor, const bool forceDefaults); + + bool HasColumn(const std::string& name) { + return Schema->HasField(name); + } + + ui64 num_columns() const { + return Columns.size(); } ui64 num_rows() const { - return RecordsCount; + AFL_VERIFY(RecordsCount); + return *RecordsCount; } - std::shared_ptr BuildTable(const std::optional>& columnNames = {}) const; + ui32 GetColumnsCount() const { + return Columns.size(); + } + + const std::shared_ptr& GetColumnVerified(const ui32 idx) const { + AFL_VERIFY(idx < Columns.size()); + return Columns[idx]; + } + + std::shared_ptr BuildTableVerified(const std::optional>& columnNames = {}) const; std::shared_ptr BuildTableOptional(const std::optional>& columnNames = {}) const; std::shared_ptr BuildEmptySame() const; [[nodiscard]] TConclusionStatus MergeColumnsStrictly(const TGeneralContainer& container); [[nodiscard]] TConclusionStatus AddField(const std::shared_ptr& f, const std::shared_ptr& data); + [[nodiscard]] TConclusionStatus AddField(const std::shared_ptr& f, const std::shared_ptr& data); - TGeneralContainer(const std::shared_ptr& table); + [[nodiscard]] TConclusionStatus AddField(const std::shared_ptr& f, const std::shared_ptr& data); + TGeneralContainer() = default; + TGeneralContainer(const std::shared_ptr& table); TGeneralContainer(const std::shared_ptr& table); - TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns); + TGeneralContainer(const std::shared_ptr& schema, std::vector>&& columns); + TGeneralContainer(const std::vector>& fields, std::vector>&& columns); arrow::Status ValidateFull() const { return arrow::Status::OK(); } - std::shared_ptr GetAccessorByNameOptional(const std::string& fieldId) const { - for (i32 i = 0; i < Schema->num_fields(); ++i) { - if (Schema->field(i)->name() == fieldId) { - return Columns[i]; - } - } - return nullptr; - } - + std::shared_ptr GetAccessorByNameOptional(const std::string& fieldId) const; std::shared_ptr GetAccessorByNameVerified(const std::string& fieldId) const; }; diff --git a/ydb/core/formats/arrow/modifier/schema.cpp b/ydb/core/formats/arrow/modifier/schema.cpp new file mode 100644 index 000000000000..4cf792614802 --- /dev/null +++ b/ydb/core/formats/arrow/modifier/schema.cpp @@ -0,0 +1,69 @@ +#include "schema.h" +#include +#include + +namespace NKikimr::NArrow::NModifier { + +std::shared_ptr TSchema::Finish() { + AFL_VERIFY(!Finished); + Finished = true; + return std::make_shared(Fields); +} + +const std::shared_ptr& TSchema::GetFieldByName(const std::string& name) const { + AFL_VERIFY(!Finished); + auto it = IndexByName.find(name); + if (it == IndexByName.end()) { + return Default>(); + } else { + return Fields[it->second]; + } +} + +TConclusionStatus TSchema::AddField(const std::shared_ptr& f) { + AFL_VERIFY(!Finished); + if (!IndexByName.emplace(f->name(), Fields.size()).second) { + return TConclusionStatus::Fail("field name duplication: " + f->name()); + } + Fields.emplace_back(f); + return TConclusionStatus::Success(); +} + +TString TSchema::ToString() const { + TStringBuilder result; + for (auto&& i : Fields) { + result << i->ToString() << ";"; + } + return result; +} + +const std::shared_ptr& TSchema::field(const ui32 index) const { + AFL_VERIFY(index < Fields.size()); + return Fields[index]; +} + +const std::shared_ptr& TSchema::GetFieldVerified(const ui32 index) const { + AFL_VERIFY(index < Fields.size()); + return Fields[index]; +} + +void TSchema::Initialize(const std::vector>& fields) { + AFL_VERIFY(!Initialized); + Initialized = true; + for (auto&& i : fields) { + IndexByName.emplace(i->name(), Fields.size()); + Fields.emplace_back(i); + } +} + +TSchema::TSchema(const std::shared_ptr& schema) { + AFL_VERIFY(schema); + Initialize(schema->Fields); +} + +TSchema::TSchema(const std::shared_ptr& schema) { + AFL_VERIFY(schema); + Initialize(schema->fields()); +} + +} \ No newline at end of file diff --git a/ydb/core/formats/arrow/modifier/schema.h b/ydb/core/formats/arrow/modifier/schema.h new file mode 100644 index 000000000000..dc663bad9f6a --- /dev/null +++ b/ydb/core/formats/arrow/modifier/schema.h @@ -0,0 +1,55 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NArrow::NModifier { +class TSchema { +private: + bool Initialized = false; + THashMap IndexByName; + std::vector> Fields; + bool Finished = false; + + void Initialize(const std::vector>& fields); +public: + TSchema() = default; + TSchema(const std::shared_ptr& schema); + + TSchema(const std::shared_ptr& schema); + + TSchema(const std::vector>& fields) { + Initialize(fields); + } + + i32 GetFieldIndex(const std::string& fName) const { + auto it = IndexByName.find(fName); + if (it == IndexByName.end()) { + return -1; + } + return it->second; + } + + const std::vector>& GetFields() const { + return Fields; + } + + TString ToString() const; + + std::shared_ptr Finish(); + [[nodiscard]] TConclusionStatus AddField(const std::shared_ptr& f); + const std::shared_ptr& GetFieldByName(const std::string& name) const; + + bool HasField(const std::string& name) const { + return IndexByName.contains(name); + } + + i32 num_fields() const { + return Fields.size(); + } + + const std::shared_ptr& GetFieldVerified(const ui32 index) const; + + const std::shared_ptr& field(const ui32 index) const; +}; +} \ No newline at end of file diff --git a/ydb/core/formats/arrow/modifier/ya.make b/ydb/core/formats/arrow/modifier/ya.make new file mode 100644 index 000000000000..a3fbb8b6d9ac --- /dev/null +++ b/ydb/core/formats/arrow/modifier/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +PEERDIR( + contrib/libs/apache/arrow + ydb/library/conclusion + ydb/core/formats/arrow/switch + ydb/library/actors/core +) + +SRCS( + schema.cpp +) + +END() diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index eb71d0731c52..e07f76ed3b49 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -919,7 +919,7 @@ std::set TProgramStep::GetColumnsInUsage(const bool originalOnly/* } arrow::Result> TProgramStep::BuildFilter(const std::shared_ptr& t) const { - return BuildFilter(t->BuildTable(GetColumnsInUsage(true))); + return BuildFilter(t->BuildTableVerified(GetColumnsInUsage(true))); } arrow::Result> TProgramStep::BuildFilter(const std::shared_ptr& t) const { diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make index 4615047b39fa..49938a884154 100644 --- a/ydb/core/formats/arrow/ya.make +++ b/ydb/core/formats/arrow/ya.make @@ -12,6 +12,7 @@ PEERDIR( ydb/core/formats/arrow/dictionary ydb/core/formats/arrow/transformer ydb/core/formats/arrow/reader + ydb/core/formats/arrow/modifier ydb/core/formats/arrow/scalar ydb/core/formats/arrow/hash ydb/library/actors/core diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 56b5c2e6477d..6e787b7f5660 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -36,10 +36,9 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches( for (auto&& i : portions) { auto dataSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions); - auto batch = i.GetBatch(dataSchema, *resultSchema); + auto batch = i.RestoreBatch(dataSchema, *resultSchema); batch = resultSchema->NormalizeBatch(*dataSchema, batch).DetachResult(); - batch = IIndexInfo::NormalizeDeletionColumn(batch); - Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey())); + IIndexInfo::NormalizeDeletionColumn(*batch); auto filter = BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), portionsInUsage, resultSchema); mergeStream.AddSource(batch, filter); } @@ -59,20 +58,23 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches( } std::shared_ptr TGeneralCompactColumnEngineChanges::BuildPortionFilter( - const std::optional& shardingActual, const std::shared_ptr& batch, + const std::optional& shardingActual, const std::shared_ptr& batch, const TPortionInfo& pInfo, const THashSet& portionsInUsage, const ISnapshotSchema::TPtr& resultSchema) const { std::shared_ptr filter; + auto table = batch->BuildTableVerified(); if (shardingActual && pInfo.NeedShardingFilter(*shardingActual)) { - filter = shardingActual->GetShardingInfo()->GetFilter(batch); + filter = shardingActual->GetShardingInfo()->GetFilter(table); } NArrow::TColumnFilter filterDeleted = NArrow::TColumnFilter::BuildAllowFilter(); if (pInfo.GetMeta().GetDeletionsCount()) { - auto col = batch->GetColumnByName(TIndexInfo::SPEC_COL_DELETE_FLAG); + auto col = table->GetColumnByName(TIndexInfo::SPEC_COL_DELETE_FLAG); AFL_VERIFY(col); AFL_VERIFY(col->type()->id() == arrow::Type::BOOL); - auto bCol = static_pointer_cast(col); - for (ui32 i = 0; i < bCol->length(); ++i) { - filterDeleted.Add(!bCol->GetView(i)); + for (auto&& c : col->chunks()) { + auto bCol = static_pointer_cast(c); + for (ui32 i = 0; i < bCol->length(); ++i) { + filterDeleted.Add(!bCol->GetView(i)); + } } NArrow::TColumnFilter filterCorrection = NArrow::TColumnFilter::BuildDenyFilter(); auto pkSchema = resultSchema->GetIndexInfo().GetReplaceKey(); @@ -151,23 +153,20 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks( ui32 idx = 0; for (auto&& i : portions) { auto dataSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions); - auto batch = i.GetBatch(dataSchema, *resultSchema, pkFieldNamesSet); + auto batch = i.RestoreBatch(dataSchema, *resultSchema, pkFieldNamesSet); { NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared>>( portionIdFieldName, idx++); - batch = NArrow::TStatusValidator::GetValid( - batch->AddColumn(batch->num_columns(), portionIdField, column->BuildArray(batch->num_rows()))); + batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate(); } { NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared>>( portionRecordIndexFieldName); - batch = NArrow::TStatusValidator::GetValid( - batch->AddColumn(batch->num_columns(), portionRecordIndexField, column->BuildArray(batch->num_rows()))); + batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate(); } - batch = IIndexInfo::NormalizeDeletionColumn(batch); - Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey())); + IIndexInfo::NormalizeDeletionColumn(*batch); std::shared_ptr filter = BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), usedPortionIds, resultSchema); mergeStream.AddSource(batch, filter); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index 75dc35317630..d378b4829b08 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -14,7 +14,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { void BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector&& portions) noexcept; std::shared_ptr BuildPortionFilter(const std::optional& shardingActual, - const std::shared_ptr& batch, const TPortionInfo& pInfo, const THashSet& portionsInUsage, + const std::shared_ptr& batch, const TPortionInfo& pInfo, const THashSet& portionsInUsage, const ISnapshotSchema::TPtr& resultSchema) const; protected: virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override; diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 6207a8883376..5c9e911dea54 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -54,10 +54,10 @@ namespace { class TBatchInfo { private: - YDB_READONLY_DEF(std::shared_ptr, Batch); + YDB_READONLY_DEF(std::shared_ptr, Batch); const NEvWrite::EModificationType ModificationType; public: - TBatchInfo(const std::shared_ptr& batch, const NEvWrite::EModificationType modificationType) + TBatchInfo(const std::shared_ptr& batch, const NEvWrite::EModificationType modificationType) : Batch(batch) , ModificationType(modificationType) { @@ -85,7 +85,7 @@ class TPathData { return HasDeletionFlag; } - void AddBatch(const NOlap::TInsertedData& data, const std::shared_ptr& batch) { + void AddBatch(const NOlap::TInsertedData& data, const std::shared_ptr& batch) { if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) { HasDeletionFlag = true; } @@ -109,8 +109,8 @@ class TPathData { for (auto&& batch : Batches) { auto& forMerge = batch.GetBatch(); stream.AddSource(forMerge, nullptr); - for (ui32 cIdx = 0; cIdx < (ui32)forMerge->num_columns(); ++cIdx) { - fieldSizes[forMerge->column_name(cIdx)] += NArrow::GetArrayDataSize(forMerge->column(cIdx)); + for (ui32 cIdx = 0; cIdx < (ui32)forMerge->GetColumnsCount(); ++cIdx) { + fieldSizes[forMerge->GetSchema()->GetFieldVerified(cIdx)->name()] += forMerge->GetColumnVerified(cIdx)->GetRawSize().value_or(0); } rowsCount += forMerge->num_rows(); } @@ -131,7 +131,8 @@ class TPathesData { return Data; } - void Add(const NOlap::TInsertedData& inserted, const std::optional& info, const std::shared_ptr& batch) { + void Add(const NOlap::TInsertedData& inserted, const std::optional& info, + const std::shared_ptr& batch) { auto it = Data.find(inserted.PathId); if (it == Data.end()) { it = Data.emplace(inserted.PathId, info).first; @@ -169,22 +170,18 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont auto& indexInfo = blobSchema->GetIndexInfo(); Y_ABORT_UNLESS(indexInfo.IsSorted()); - std::shared_ptr batch; + std::shared_ptr batch; { const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange); Y_ABORT_UNLESS(blobData.size(), "Blob data not present"); // Prepare batch - batch = NArrow::DeserializeBatch(blobData, indexInfo.ArrowSchema()); - AFL_VERIFY(batch)("event", "cannot_parse") - ("data_snapshot", TStringBuilder() << inserted.GetSnapshot()) - ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot()); - ; + batch = std::make_shared(NArrow::DeserializeBatch(blobData, indexInfo.ArrowSchema())); + AFL_VERIFY(batch)("event", "cannot_parse")("data_snapshot", inserted.GetSnapshot())("index_snapshot", blobSchema->GetSnapshot()); } - batch = AddSpecials(batch, indexInfo, inserted); + AddSpecials(*batch, indexInfo, inserted); batch = resultSchema->NormalizeBatch(*blobSchema, batch).DetachResult(); pathBatches.Add(inserted, shardingFilterCommit, batch); - Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, resultSchema->GetIndexInfo().GetReplaceKey())); } Y_ABORT_UNLESS(Blobs.IsEmpty()); @@ -192,20 +189,23 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont for (auto& [pathId, pathInfo] : pathBatches.GetData()) { auto shardingFilter = context.SchemaVersions.GetShardingInfoActual(pathId); auto mergedBatch = pathInfo.Merge(resultSchema->GetIndexInfo()); + Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(mergedBatch, resultSchema->GetIndexInfo().GetReplaceKey())); auto itGranule = PathToGranule.find(pathId); AFL_VERIFY(itGranule != PathToGranule.end()); - std::vector> result = NArrow::NMerger::TRWSortableBatchPosition:: - SplitByBordersInSequentialContainer(mergedBatch, comparableColumns, itGranule->second); + std::vector> result = + NArrow::NMerger::TRWSortableBatchPosition::SplitByBordersInSequentialContainer(mergedBatch, comparableColumns, itGranule->second); for (auto&& b : result) { if (!b) { continue; } std::optional externalSaver; if (b->num_rows() < 100) { - externalSaver = NArrow::NSerialization::TSerializerContainer(std::make_shared(arrow::Compression::type::UNCOMPRESSED)); + externalSaver = NArrow::NSerialization::TSerializerContainer( + std::make_shared(arrow::Compression::type::UNCOMPRESSED)); } else { - externalSaver = NArrow::NSerialization::TSerializerContainer(std::make_shared(arrow::Compression::type::LZ4_FRAME)); + externalSaver = NArrow::NSerialization::TSerializerContainer( + std::make_shared(arrow::Compression::type::LZ4_FRAME)); } auto portions = MakeAppendedPortions(b, pathId, maxSnapshot, nullptr, context, externalSaver); Y_ABORT_UNLESS(portions.size()); @@ -222,11 +222,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont return TConclusionStatus::Success(); } -std::shared_ptr TInsertColumnEngineChanges::AddSpecials(const std::shared_ptr& srcBatch, - const TIndexInfo& indexInfo, const TInsertedData& inserted) const { - auto batch = IIndexInfo::AddSnapshotColumns(srcBatch, inserted.GetSnapshot()); - batch = IIndexInfo::AddDeleteFlagsColumn(batch, inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete); - return NArrow::TColumnOperator().Adapt(batch, indexInfo.ArrowSchemaWithSpecials()).DetachResult(); +void TInsertColumnEngineChanges::AddSpecials( + NArrow::TGeneralContainer& batch, const TIndexInfo& indexInfo, const TInsertedData& inserted) const { + IIndexInfo::AddSnapshotColumns(batch, inserted.GetSnapshot()); + IIndexInfo::AddDeleteFlagsColumn(batch, inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete); } NColumnShard::ECumulativeCounters TInsertColumnEngineChanges::GetCounterIndex(const bool isSuccess) const { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index 95befd334c23..6702d10621f4 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -10,8 +10,7 @@ namespace NKikimr::NOlap { class TInsertColumnEngineChanges: public TChangesWithAppend { private: using TBase = TChangesWithAppend; - std::shared_ptr AddSpecials(const std::shared_ptr& srcBatch, - const TIndexInfo& indexInfo, const TInsertedData& inserted) const; + void AddSpecials(NArrow::TGeneralContainer& batch, const TIndexInfo& indexInfo, const TInsertedData& inserted) const; std::vector DataToIndex; protected: virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 492c099ceae7..3091b5a61068 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -684,6 +684,24 @@ bool TPortionInfo::NeedShardingFilter(const TGranuleShardingInfo& shardingInfo) return true; } +std::shared_ptr TPortionInfo::TPreparedColumn::AssembleAccessor() const { + Y_ABORT_UNLESS(!Blobs.empty()); + + std::vector> chunks; + chunks.reserve(Blobs.size()); + for (auto& blob : Blobs) { + auto batch = blob.BuildRecordBatch(*Loader); + Y_ABORT_UNLESS(batch); + AFL_VERIFY(batch->num_columns() == 1); + chunks.emplace_back(batch->column(0)); + } + if (chunks.size() > 1) { + return std::make_shared(NArrow::TStatusValidator::GetValid(arrow::ChunkedArray::Make(chunks))); + } else { + return std::make_shared(chunks.front()); + } +} + std::shared_ptr TPortionInfo::TPreparedColumn::AssembleForSeqAccess() const { Y_ABORT_UNLESS(!Blobs.empty()); @@ -752,7 +770,18 @@ std::shared_ptr TPortionInfo::TPreparedBatchData::Ass fields.emplace_back(i.GetField()); } - return std::make_shared(std::make_shared(fields), std::move(columns)); + return std::make_shared(fields, std::move(columns)); +} + +std::shared_ptr TPortionInfo::TPreparedBatchData::AssembleToGeneralContainer() const { + std::vector> columns; + std::vector> fields; + for (auto&& i : Columns) { + columns.emplace_back(i.AssembleAccessor()); + fields.emplace_back(i.GetField()); + } + + return std::make_shared(fields, std::move(columns)); } std::shared_ptr TPortionInfo::TPreparedBatchData::AssembleTable(const TAssembleOptions& options) const { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 4d98f9c15c37..b4f06b16597e 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -65,6 +65,9 @@ class TDeserializeChunkedArray: public NArrow::NAccessor::IChunkedArray { std::shared_ptr Loader; std::vector Chunks; protected: + virtual std::optional DoGetRawSize() const override { + return {}; + } virtual TCurrentChunkAddress DoGetChunk(const std::optional& chunkCurrent, const ui64 position) const override; virtual std::shared_ptr DoGetChunkedArray() const override { AFL_VERIFY(false); @@ -683,6 +686,7 @@ class TPortionInfo { std::shared_ptr Assemble() const; std::shared_ptr AssembleForSeqAccess() const; + std::shared_ptr AssembleAccessor() const; }; class TPreparedBatchData { @@ -748,6 +752,7 @@ class TPortionInfo { } std::shared_ptr Assemble(const TAssembleOptions& options = {}) const; + std::shared_ptr AssembleToGeneralContainer() const; std::shared_ptr AssembleTable(const TAssembleOptions& options = {}) const; std::shared_ptr AssembleForSeqAccess() const; }; diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp index 69b11dbdab05..7e4164b64da0 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp @@ -13,33 +13,19 @@ void TReadPortionInfoWithBlobs::RestoreChunk(const std::shared_ptr TReadPortionInfoWithBlobs::GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set& columnNames) const { +std::shared_ptr TReadPortionInfoWithBlobs::RestoreBatch( + const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set& columnNames) const { Y_ABORT_UNLESS(data); + THashMap blobs; + for (auto&& i : PortionInfo.Records) { + blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk); + Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size); + } if (columnNames.empty()) { - if (!CachedBatch) { - THashMap blobs; - for (auto&& i : PortionInfo.Records) { - blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk); - Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size); - } - CachedBatch = PortionInfo.AssembleInBatch(*data, result, blobs); - Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(*CachedBatch, result.GetIndexInfo().GetReplaceKey())); - } - return *CachedBatch; - } else if (CachedBatch) { - std::vector columnNamesString; - for (auto&& i : columnNames) { - columnNamesString.emplace_back(i.data(), i.size()); - } - return NArrow::TColumnOperator().VerifyIfAbsent().Extract(*CachedBatch, columnNamesString); + return PortionInfo.PrepareForAssemble(*data, result, blobs).AssembleToGeneralContainer(); } else { auto filteredSchema = std::make_shared(data, columnNames); - THashMap blobs; - for (auto&& i : PortionInfo.Records) { - blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk); - Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size); - } - return PortionInfo.AssembleInBatch(*data, *filteredSchema, blobs); + return PortionInfo.PrepareForAssemble(*data, *filteredSchema, blobs).AssembleToGeneralContainer(); } } @@ -143,8 +129,8 @@ std::optional TReadPortionInfoWithBlobs::SyncP TIndexInfo::TSecondaryData secondaryData; secondaryData.MutableExternalData() = entityChunksNew; for (auto&& i : to->GetIndexInfo().GetIndexes()) { - to->GetIndexInfo().AppendIndex(entityChunksNew, i.first, storages, secondaryData).Validate(); - } + to->GetIndexInfo().AppendIndex(entityChunksNew, i.first, storages, secondaryData).Validate(); + } const NSplitter::TEntityGroups groups = to->GetIndexInfo().GetEntityGroupsByStorageId(targetTier, *storages); auto schemaTo = std::make_shared(to, std::make_shared()); diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h index 42fcd8a52f8a..96134c83161b 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h @@ -21,7 +21,6 @@ class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs { void RestoreChunk(const std::shared_ptr& chunk); TPortionInfo PortionInfo; - mutable std::optional> CachedBatch; explicit TReadPortionInfoWithBlobs(TPortionInfo&& portionInfo) : PortionInfo(std::move(portionInfo)) { @@ -39,7 +38,7 @@ class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs { static TReadPortionInfoWithBlobs RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo); - std::shared_ptr GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set& columnNames = {}) const; + std::shared_ptr RestoreBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set& columnNames = {}) const; static std::optional SyncPortion(TReadPortionInfoWithBlobs&& source, const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr& storages, std::shared_ptr counters); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h index 1bd31f77dc0e..f68419c88ad9 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h @@ -87,7 +87,7 @@ class TFetchedData { void AddBatch(const std::shared_ptr& table) { AFL_VERIFY(table); if (UseFilter) { - AddBatch(table->BuildTable()); + AddBatch(table->BuildTableVerified()); } else { if (!Table) { Table = table; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index c9863650ff76..3e83e9d88e0f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -89,13 +89,14 @@ ui64 TFilterProgramStep::DoPredictRawBytes(const std::shared_ptr& s } TConclusion TPredicateFilter::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - auto filter = source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(source->GetStageData().GetTable()->BuildTable()); + auto filter = source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(source->GetStageData().GetTable()->BuildTableVerified()); source->MutableStageData().AddFilter(filter); return true; } TConclusion TSnapshotFilter::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - auto filter = MakeSnapshotFilter(source->GetStageData().GetTable()->BuildTable(), source->GetContext()->GetReadMetadata()->GetRequestSnapshot()); + auto filter = MakeSnapshotFilter( + source->GetStageData().GetTable()->BuildTableVerified(), source->GetContext()->GetReadMetadata()->GetRequestSnapshot()); source->MutableStageData().AddFilter(filter); return true; } @@ -119,7 +120,8 @@ TConclusion TDeletionFilter::DoExecuteInplace(const std::shared_ptr TShardingFilter::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { NYDBTest::TControllers::GetColumnShardController()->OnSelectShardingFilter(); - auto filter = source->GetContext()->GetReadMetadata()->GetRequestShardingInfo()->GetShardingInfo()->GetFilter(source->GetStageData().GetTable()->BuildTable()); + auto filter = source->GetContext()->GetReadMetadata()->GetRequestShardingInfo()->GetShardingInfo()->GetFilter( + source->GetStageData().GetTable()->BuildTableVerified()); source->MutableStageData().AddFilter(filter); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index 6fe0b3992f8e..dfeaec1b7aca 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -71,7 +71,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() { TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::EXCLUSIVE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); auto& container = Sources.begin()->second->GetStageResult().GetBatch(); if (container && container->num_rows()) { - ResultBatch = container->BuildTable(); + ResultBatch = container->BuildTableVerified(); LastPK = Sources.begin()->second->GetLastPK(); ResultBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); Context->GetCommonContext()->GetCounters().OnNoScanInterval(ResultBatch->num_rows()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index b9742c17b3e7..c90a3b26a7e3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -240,10 +240,11 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr& AFL_VERIFY(GetStageData().GetBlobs().size() == 1); auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first); auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion()); - auto batch = NArrow::DeserializeBatch(bData, schema); - AFL_VERIFY(batch)("schema", schema->ToString()); - batch = GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(batch, CommittedBlob.GetSnapshot()); - batch = GetContext()->GetReadMetadata()->GetIndexInfo().AddDeleteFlagsColumn(batch, CommittedBlob.GetIsDelete()); + auto rBatch = NArrow::DeserializeBatch(bData, schema); + AFL_VERIFY(rBatch)("schema", schema->ToString()); + auto batch = std::make_shared(rBatch); + GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, CommittedBlob.GetSnapshot()); + GetContext()->GetReadMetadata()->GetIndexInfo().AddDeleteFlagsColumn(*batch, CommittedBlob.GetIsDelete()); MutableStageData().AddBatch(batch); } MutableStageData().SyncTableColumns(columns->GetSchema()->fields()); diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp index b019da7e132d..f5473eaab885 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp @@ -14,36 +14,25 @@ std::shared_ptr IIndexInfo::GetColumnLoaderVerifi return result; } -std::shared_ptr IIndexInfo::AddDeleteFlagsColumn(const std::shared_ptr& batch, const bool isDelete) { - Y_ABORT_UNLESS(batch); - i64 numColumns = batch->num_columns(); - i64 numRows = batch->num_rows(); +void IIndexInfo::AddDeleteFlagsColumn(NArrow::TGeneralContainer& batch, const bool isDelete) { + const i64 numRows = batch.num_rows(); - AFL_VERIFY(!batch->GetColumnByName(SPEC_COL_DELETE_FLAG)); - return NArrow::TStatusValidator::GetValid(batch->AddColumn(numColumns, arrow::field(SPEC_COL_DELETE_FLAG, arrow::boolean()), - NArrow::TThreadSimpleArraysCache::GetConst(arrow::boolean(), std::make_shared(isDelete), numRows))); + batch.AddField(arrow::field(SPEC_COL_DELETE_FLAG, arrow::boolean()), + NArrow::TThreadSimpleArraysCache::GetConst(arrow::boolean(), std::make_shared(isDelete), numRows)).Validate(); } -std::shared_ptr IIndexInfo::AddSnapshotColumns(const std::shared_ptr& batch, const TSnapshot& snapshot) { - Y_ABORT_UNLESS(batch); - i64 numColumns = batch->num_columns(); - i64 numRows = batch->num_rows(); +void IIndexInfo::AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot) { + const i64 numRows = batch.num_rows(); - auto res = batch->AddColumn(numColumns, arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), - NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows)); - Y_ABORT_UNLESS(res.ok()); - res = (*res)->AddColumn(numColumns + 1, arrow::field(SPEC_COL_TX_ID, arrow::uint64()), - NArrow::MakeUI64Array(snapshot.GetTxId(), numRows)); - Y_ABORT_UNLESS(res.ok()); - Y_ABORT_UNLESS((*res)->num_columns() == numColumns + 2); - return *res; + batch.AddField(arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), NArrow::MakeUI64Array(snapshot.GetPlanStep(), numRows)).Validate(); + batch.AddField(arrow::field(SPEC_COL_TX_ID, arrow::uint64()), NArrow::MakeUI64Array(snapshot.GetTxId(), numRows)).Validate(); } -std::shared_ptr IIndexInfo::NormalizeDeletionColumn(const std::shared_ptr& batch) { - if (batch->schema()->GetFieldIndex(SPEC_COL_DELETE_FLAG) >= 0) { - return batch; +void IIndexInfo::NormalizeDeletionColumn(NArrow::TGeneralContainer& batch) { + if (batch.HasColumn(SPEC_COL_DELETE_FLAG)) { + return; } - return AddDeleteFlagsColumn(batch, false); + AddDeleteFlagsColumn(batch, false); } std::optional IIndexInfo::GetColumnIdOptional(const std::string& name) const { diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h index ab6cd67a3937..3a39def48425 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h @@ -1,6 +1,7 @@ #pragma once #include "loader.h" +#include #include #include @@ -106,10 +107,10 @@ class IIndexInfo { virtual std::shared_ptr GetColumnLoaderOptional(const ui32 columnId) const = 0; std::shared_ptr GetColumnLoaderVerified(const ui32 columnId) const; - static std::shared_ptr NormalizeDeletionColumn(const std::shared_ptr& batch); + static void NormalizeDeletionColumn(NArrow::TGeneralContainer& batch); - static std::shared_ptr AddSnapshotColumns(const std::shared_ptr& batch, const TSnapshot& snapshot); - static std::shared_ptr AddDeleteFlagsColumn(const std::shared_ptr& batch, const bool isDelete); + static void AddSnapshotColumns(NArrow::TGeneralContainer& batch, const TSnapshot& snapshot); + static void AddDeleteFlagsColumn(NArrow::TGeneralContainer& batch, const bool isDelete); static ui64 GetSpecialColumnsRecordSize() { return sizeof(ui64) + sizeof(ui64) + sizeof(bool); diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp index 300952cc3227..ea363d688f95 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp @@ -27,34 +27,33 @@ std::set ISnapshotSchema::GetPkColumnsIds() const { } -TConclusion> ISnapshotSchema::NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr batch) const { +TConclusion> ISnapshotSchema::NormalizeBatch( + const ISnapshotSchema& dataSchema, const std::shared_ptr& batch) const { if (dataSchema.GetSnapshot() == GetSnapshot()) { return batch; } - Y_ABORT_UNLESS(dataSchema.GetSnapshot() < GetSnapshot()); + AFL_VERIFY(dataSchema.GetSnapshot() < GetSnapshot()); const std::shared_ptr& resultArrowSchema = GetSchema(); - std::vector> newColumns; - newColumns.reserve(resultArrowSchema->num_fields()); + std::shared_ptr result = std::make_shared(); for (size_t i = 0; i < resultArrowSchema->fields().size(); ++i) { auto& resultField = resultArrowSchema->fields()[i]; auto columnId = GetIndexInfo().GetColumnId(resultField->name()); - auto oldColumnIndex = dataSchema.GetFieldIndex(columnId); - if (oldColumnIndex >= 0) { // ColumnExists - auto oldColumnInfo = dataSchema.GetFieldByIndex(oldColumnIndex); - Y_ABORT_UNLESS(oldColumnInfo); - auto columnData = batch->GetColumnByName(oldColumnInfo->name()); - Y_ABORT_UNLESS(columnData); - newColumns.push_back(columnData); - } else { // AddNullColumn + auto oldField = dataSchema.GetFieldByColumnIdOptional(columnId); + if (oldField) { + auto conclusion = result->AddField(resultField, batch->GetAccessorByNameVerified(oldField->name())); + if (conclusion.IsFail()) { + return conclusion; + } + } else { auto conclusion = BuildDefaultBatch({ resultField }, batch->num_rows()); if (conclusion.IsFail()) { return conclusion; } - newColumns.push_back((*conclusion)->column(0)); + result->AddField(resultField, (*conclusion)->column(0)).Validate(); } } - return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns); + return result; } TConclusion> ISnapshotSchema::PrepareForModification( diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h index 91d53230ec38..0eb5aa5968d0 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h @@ -1,4 +1,6 @@ #pragma once +#include + #include #include #include @@ -64,7 +66,8 @@ class ISnapshotSchema { std::set GetPkColumnsIds() const; - [[nodiscard]] TConclusion> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr batch) const; + [[nodiscard]] TConclusion> NormalizeBatch( + const ISnapshotSchema& dataSchema, const std::shared_ptr& batch) const; [[nodiscard]] TConclusion> PrepareForModification( const std::shared_ptr& incomingBatch, const NEvWrite::EModificationType mType) const; };