Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize memory footprint of CS schemas #12593

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,23 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
return ReorderImpl(incoming, columnNames);
}
namespace {
template <class TDataContainer, class TSchemaImpl>
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TDataContainer>& srcBatch,
const std::shared_ptr<TSchemaImpl>& dstSchema, const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
template <class TDataContainer>
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TDataContainer>& srcBatch, const TSchemaLiteView& dstSchema,
const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
AFL_VERIFY(srcBatch);
AFL_VERIFY(dstSchema);
if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) {
if (dstSchema.num_fields() < srcBatch->schema()->num_fields()) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns set: destination must been wider than source")(
"source", srcBatch->schema()->ToString())("destination", dstSchema->ToString());
"source", srcBatch->schema()->ToString())("destination", dstSchema.ToString());
return TConclusionStatus::Fail("incorrect columns set: destination must been wider than source");
}
std::set<ui32> fieldIdx;
auto itSrc = srcBatch->schema()->fields().begin();
auto itDst = dstSchema->fields().begin();
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema->fields().end()) {
auto itDst = dstSchema.begin();
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema.end()) {
if ((*itSrc)->name() != (*itDst)->name()) {
++itDst;
} else {
fieldIdx.emplace(itDst - dstSchema->fields().begin());
fieldIdx.emplace(itDst - dstSchema.begin());
if (checkFieldTypesPolicy != TColumnOperator::ECheckFieldTypesPolicy::Ignore && (*itDst)->Equals(*itSrc)) {
switch (checkFieldTypesPolicy) {
case TColumnOperator::ECheckFieldTypesPolicy::Error: {
Expand All @@ -245,25 +244,24 @@ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TData
++itSrc;
}
}
if (itDst == dstSchema->fields().end() && itSrc != srcBatch->schema()->fields().end()) {
if (itDst == dstSchema.end() && itSrc != srcBatch->schema()->fields().end()) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns order in source set")("source", srcBatch->schema()->ToString())(
"destination", dstSchema->ToString());
"destination", dstSchema.ToString());
return TConclusionStatus::Fail("incorrect columns order in source set");
}
return TSchemaSubset(fieldIdx, dstSchema->num_fields());
return TSchemaSubset(fieldIdx, dstSchema.num_fields());
}
} // namespace

TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema) {
const std::shared_ptr<arrow::RecordBatch>& incoming, const NArrow::TSchemaLiteView& dstSchema) {
return BuildSequentialSubsetImpl(incoming, dstSchema, DifferentColumnTypesPolicy);
}
namespace {
template <class TDataContainer>
TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(const std::shared_ptr<TDataContainer>& incoming,
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver,
const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver, const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) {
struct TFieldData {
ui32 Index;
Expand All @@ -273,14 +271,13 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
}
};
AFL_VERIFY(incoming);
AFL_VERIFY(dstSchema);
std::vector<TFieldData> resultColumns;
resultColumns.reserve(incoming->num_columns());
ui32 idx = 0;
for (auto& srcField : incoming->schema()->fields()) {
const int dstIndex = nameResolver(srcField->name());
if (dstIndex > -1) {
const auto& dstField = dstSchema->GetFieldByIndexVerified(dstIndex);
const auto& dstField = dstSchema.GetFieldByIndexVerified(dstIndex);
switch (differentColumnTypesPolicy) {
case TColumnOperator::ECheckFieldTypesPolicy::Verify:
AFL_VERIFY(dstField->type()->Equals(srcField->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
Expand Down Expand Up @@ -322,14 +319,14 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
columns.reserve(resultColumns.size());
fields.reserve(resultColumns.size());
for (auto&& i : resultColumns) {
fields.emplace_back(dstSchema->field(i.Index));
fields.emplace_back(dstSchema.field(i.Index));
columns.emplace_back(i.Column);
}
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), incoming->num_rows());
}
} // namespace
TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::AdaptIncomingToDestinationExt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<TSchemaLite>& dstSchema,
const std::shared_ptr<arrow::RecordBatch>& incoming, const TSchemaLiteView& dstSchema,
const std::function<TConclusionStatus(const ui32, const i32)>& checker, const std::function<i32(const std::string&)>& nameResolver) const {
return AdaptIncomingToDestinationExtImpl(incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy);
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NKikimr::NArrow {

class TSchemaSubset;
class TSchemaLite;
class TSchemaLiteView;

class TColumnOperator {
public:
Expand Down Expand Up @@ -59,7 +60,7 @@ class TColumnOperator {
}

TConclusion<std::shared_ptr<arrow::RecordBatch>> AdaptIncomingToDestinationExt(const std::shared_ptr<arrow::RecordBatch>& incoming,
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver) const;

std::shared_ptr<arrow::RecordBatch> Extract(
Expand All @@ -73,7 +74,7 @@ class TColumnOperator {
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);

TConclusion<TSchemaSubset> BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema);
const std::shared_ptr<arrow::RecordBatch>& incoming, const NArrow::TSchemaLiteView& dstSchema);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
}

auto schemaSnapshot = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema();
auto schema = schemaSnapshot->GetSchema();
auto index = schemaSnapshot->GetColumnIdOptional(columnName);
if (!index) {
return TTxController::TProposeResult(
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class TPathFieldsInfo {
if (!Schemas.contains(data.GetSchemaVersion())) {
Schemas.emplace(data.GetSchemaVersion(), blobSchema);
}
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().GetColumnIds(false));
auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end());
if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
Expand Down Expand Up @@ -245,8 +246,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
std::shared_ptr<NArrow::TGeneralContainer> batch;
{
const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange);

auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
auto batchSchema =
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema()->fields()));
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end()));
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
std::set<ui32> columnIdsToDelete = blobSchema->GetColumnIdsToDelete(resultSchema);
if (!columnIdsToDelete.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class TReadMetadataBase {

ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const;

const std::shared_ptr<NArrow::TSchemaLite>& GetBlobSchema(const ui64 version) const {
NArrow::TSchemaLiteView GetBlobSchema(const ui64 version) const {
return GetIndexVersions().GetSchemaVerified(version)->GetIndexInfo().ArrowSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,6 @@ class TReadMetadata: public TReadMetadataBase {
TConclusionStatus Init(
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor);

std::vector<std::string> GetColumnsOrder() const {
auto schema = GetResultSchema();
std::vector<std::string> result;
for (auto&& i : schema->GetSchema()->fields()) {
result.emplace_back(i->name());
}
return result;
}

std::set<ui32> GetEarlyFilterColumnIds() const;
std::set<ui32> GetPKColumnIds() const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>&
AFL_VERIFY(GetStageData().GetBlobs().size() == 1);
auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion());
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema->fields())));
AFL_VERIFY(rBatch)("schema", schema->ToString());
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end())));
AFL_VERIFY(rBatch)("schema", schema.ToString());
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
std::set<ui32> columnIdsToDelete = batchSchema->GetColumnIdsToDelete(resultSchema);
if (!columnIdsToDelete.empty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "column_ids.h"

namespace NKikimr::NOlap {}
48 changes: 48 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/abstract/column_ids.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <ydb/library/actors/core/log.h>
#include <ydb/library/formats/arrow/common/iterator.h>

#include <util/generic/noncopyable.h>
#include <util/system/types.h>

#include <span>

namespace NKikimr::NOlap {

class TColumnIdsView: private TNonCopyable {
private:
std::span<const ui32> ColumnIds;

class TIterator: public NArrow::NUtil::TRandomAccessIteratorClone<std::span<const ui32>::iterator, TIterator> {
using TBase = NArrow::NUtil::TRandomAccessIteratorClone<std::span<const ui32>::iterator, TIterator>;

public:
using TBase::TRandomAccessIteratorClone;
};

public:
template <typename It>
TColumnIdsView(const It begin, const It end)
: ColumnIds(begin, end) {
}

TIterator begin() const {
return ColumnIds.begin();
}

TIterator end() const {
return ColumnIds.end();
}

ui32 operator[](size_t idx) const {
AFL_VERIFY(idx < ColumnIds.size())("idx", idx)("size", ColumnIds.size());
return ColumnIds[idx];
}

ui64 size() const {
return ColumnIds.size();
}
};

} // namespace NKikimr::NOlap
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,9 @@ class IIndexInfo {
return result;
}

[[nodiscard]] static std::vector<ui32> AddSpecialFieldIds(const std::vector<ui32>& baseColumnIds) {
std::vector<ui32> result = baseColumnIds;
static void AddSpecialFieldIds(std::vector<ui32>& baseColumnIds) {
const auto& cIds = GetSystemColumnIds();
result.insert(result.end(), cIds.begin(), cIds.end());
return result;
baseColumnIds.insert(baseColumnIds.end(), cIds.begin(), cIds.end());
}

[[nodiscard]] static std::set<ui32> AddSpecialFieldIds(const std::set<ui32>& baseColumnIds) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/scheme/abstract/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LIBRARY()

SRCS(
index_info.cpp
column_ids.cpp
)

PEERDIR(
Expand Down
Loading
Loading