Skip to content

Commit

Permalink
Merge aa03f90 into be585b6
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 13, 2024
2 parents be585b6 + aa03f90 commit ce7ff93
Show file tree
Hide file tree
Showing 19 changed files with 160 additions and 178 deletions.
37 changes: 17 additions & 20 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,23 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
return ReorderImpl(incoming, columnNames);
}
namespace {
template <class TDataContainer, class TSchemaImpl>
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TDataContainer>& srcBatch,
const std::shared_ptr<TSchemaImpl>& dstSchema, const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
template <class TDataContainer>
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TDataContainer>& srcBatch, const TSchemaLiteView& dstSchema,
const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
AFL_VERIFY(srcBatch);
AFL_VERIFY(dstSchema);
if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) {
if (dstSchema.num_fields() < srcBatch->schema()->num_fields()) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns set: destination must been wider than source")(
"source", srcBatch->schema()->ToString())("destination", dstSchema->ToString());
"source", srcBatch->schema()->ToString())("destination", dstSchema.ToString());
return TConclusionStatus::Fail("incorrect columns set: destination must been wider than source");
}
std::set<ui32> fieldIdx;
auto itSrc = srcBatch->schema()->fields().begin();
auto itDst = dstSchema->fields().begin();
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema->fields().end()) {
auto itDst = dstSchema.fields().begin();
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema.fields().end()) {
if ((*itSrc)->name() != (*itDst)->name()) {
++itDst;
} else {
fieldIdx.emplace(itDst - dstSchema->fields().begin());
fieldIdx.emplace(itDst - dstSchema.fields().begin());
if (checkFieldTypesPolicy != TColumnOperator::ECheckFieldTypesPolicy::Ignore && (*itDst)->Equals(*itSrc)) {
switch (checkFieldTypesPolicy) {
case TColumnOperator::ECheckFieldTypesPolicy::Error: {
Expand All @@ -245,25 +244,24 @@ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TData
++itSrc;
}
}
if (itDst == dstSchema->fields().end() && itSrc != srcBatch->schema()->fields().end()) {
if (itDst == dstSchema.fields().end() && itSrc != srcBatch->schema()->fields().end()) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns order in source set")("source", srcBatch->schema()->ToString())(
"destination", dstSchema->ToString());
"destination", dstSchema.ToString());
return TConclusionStatus::Fail("incorrect columns order in source set");
}
return TSchemaSubset(fieldIdx, dstSchema->num_fields());
return TSchemaSubset(fieldIdx, dstSchema.num_fields());
}
} // namespace

TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema) {
const std::shared_ptr<arrow::RecordBatch>& incoming, const NArrow::TSchemaLiteView& dstSchema) {
return BuildSequentialSubsetImpl(incoming, dstSchema, DifferentColumnTypesPolicy);
}
namespace {
template <class TDataContainer>
TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(const std::shared_ptr<TDataContainer>& incoming,
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver,
const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver, const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) {
struct TFieldData {
ui32 Index;
Expand All @@ -273,14 +271,13 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
}
};
AFL_VERIFY(incoming);
AFL_VERIFY(dstSchema);
std::vector<TFieldData> resultColumns;
resultColumns.reserve(incoming->num_columns());
ui32 idx = 0;
for (auto& srcField : incoming->schema()->fields()) {
const int dstIndex = nameResolver(srcField->name());
if (dstIndex > -1) {
const auto& dstField = dstSchema->GetFieldByIndexVerified(dstIndex);
const auto& dstField = dstSchema.GetFieldByIndexVerified(dstIndex);
switch (differentColumnTypesPolicy) {
case TColumnOperator::ECheckFieldTypesPolicy::Verify:
AFL_VERIFY(dstField->type()->Equals(srcField->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
Expand Down Expand Up @@ -322,14 +319,14 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
columns.reserve(resultColumns.size());
fields.reserve(resultColumns.size());
for (auto&& i : resultColumns) {
fields.emplace_back(dstSchema->field(i.Index));
fields.emplace_back(dstSchema.field(i.Index));
columns.emplace_back(i.Column);
}
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), incoming->num_rows());
}
} // namespace
TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::AdaptIncomingToDestinationExt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<TSchemaLite>& dstSchema,
const std::shared_ptr<arrow::RecordBatch>& incoming, const TSchemaLiteView& dstSchema,
const std::function<TConclusionStatus(const ui32, const i32)>& checker, const std::function<i32(const std::string&)>& nameResolver) const {
return AdaptIncomingToDestinationExtImpl(incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy);
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NKikimr::NArrow {

class TSchemaSubset;
class TSchemaLite;
class TSchemaLiteView;

class TColumnOperator {
public:
Expand Down Expand Up @@ -59,7 +60,7 @@ class TColumnOperator {
}

TConclusion<std::shared_ptr<arrow::RecordBatch>> AdaptIncomingToDestinationExt(const std::shared_ptr<arrow::RecordBatch>& incoming,
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver) const;

std::shared_ptr<arrow::RecordBatch> Extract(
Expand All @@ -73,7 +74,7 @@ class TColumnOperator {
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);

TConclusion<TSchemaSubset> BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema);
const std::shared_ptr<arrow::RecordBatch>& incoming, const NArrow::TSchemaLiteView& dstSchema);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
}

auto schemaSnapshot = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema();
auto schema = schemaSnapshot->GetSchema();
auto index = schemaSnapshot->GetColumnIdOptional(columnName);
if (!index) {
return TTxController::TProposeResult(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
{
const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange);
auto batchSchema =
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema()->fields()));
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema().fields()));
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
std::set<ui32> columnIdsToDelete = blobSchema->GetColumnIdsToDelete(resultSchema);
if (!columnIdsToDelete.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class TReadMetadataBase {

ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const;

const std::shared_ptr<NArrow::TSchemaLite>& GetBlobSchema(const ui64 version) const {
NArrow::TSchemaLiteView GetBlobSchema(const ui64 version) const {
return GetIndexVersions().GetSchemaVerified(version)->GetIndexInfo().ArrowSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,6 @@ class TReadMetadata: public TReadMetadataBase {
TConclusionStatus Init(
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor);

std::vector<std::string> GetColumnsOrder() const {
auto schema = GetResultSchema();
std::vector<std::string> result;
for (auto&& i : schema->GetSchema()->fields()) {
result.emplace_back(i->name());
}
return result;
}

std::set<ui32> GetEarlyFilterColumnIds() const;
std::set<ui32> GetPKColumnIds() const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>&
AFL_VERIFY(GetStageData().GetBlobs().size() == 1);
auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion());
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema->fields())));
AFL_VERIFY(rBatch)("schema", schema->ToString());
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema.fields())));
AFL_VERIFY(rBatch)("schema", schema.ToString());
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
std::set<ui32> columnIdsToDelete = batchSchema->GetColumnIdsToDelete(resultSchema);
if (!columnIdsToDelete.empty()) {
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,9 @@ class IIndexInfo {
return result;
}

[[nodiscard]] static std::vector<ui32> AddSpecialFieldIds(const std::vector<ui32>& baseColumnIds) {
std::vector<ui32> result = baseColumnIds;
static void AddSpecialFieldIds(std::vector<ui32>& baseColumnIds) {
const auto& cIds = GetSystemColumnIds();
result.insert(result.end(), cIds.begin(), cIds.end());
return result;
baseColumnIds.insert(baseColumnIds.end(), cIds.begin(), cIds.end());
}

[[nodiscard]] static std::set<ui32> AddSpecialFieldIds(const std::set<ui32>& baseColumnIds) {
Expand Down
Loading

0 comments on commit ce7ff93

Please sign in to comment.