Skip to content

Commit

Permalink
remove blob columns not present in current scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Sep 4, 2024
1 parent 766ee48 commit 65e10f5
Show file tree
Hide file tree
Showing 15 changed files with 161 additions and 14 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme [*/*]*
ydb/core/kqp/ut/scheme KqpOlapScheme.DropThenAddColumn
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/formats/arrow/common/container.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "container.h"

#include <ydb/core/formats/arrow/common/vector_operations.h>
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
Expand Down Expand Up @@ -59,6 +60,11 @@ TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr<arrow::Field
return AddField(f, std::make_shared<NAccessor::TTrivialArray>(data));
}

void TGeneralContainer::DeleteFieldsByIndex(const std::vector<ui32>& idxs) {
Schema->DeleteFieldsByIndex(idxs);
NUtil::EraseItems(Columns, idxs);
}

void TGeneralContainer::Initialize() {
std::optional<ui64> recordsCount;
AFL_VERIFY(Schema->num_fields() == (i32)Columns.size())("schema", Schema->num_fields())("columns", Columns.size());
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/formats/arrow/common/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class TGeneralContainer {

[[nodiscard]] TConclusionStatus AddField(const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<arrow::ChunkedArray>& data);

void DeleteFieldsByIndex(const std::vector<ui32>& idxs);

TGeneralContainer(const std::shared_ptr<arrow::Table>& table);
TGeneralContainer(const std::shared_ptr<arrow::RecordBatch>& table);
TGeneralContainer(const std::shared_ptr<arrow::Schema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns);
Expand Down
54 changes: 54 additions & 0 deletions ydb/core/formats/arrow/common/vector_operations.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include <ydb/library/actors/core/log.h>

#include <util/system/types.h>

#include <vector>

namespace NKikimr::NArrow::NUtil {

template <typename T>
class TDefaultErasePolicy {
public:
void OnEraseItem(const T& /*item*/) const {
}
void OnMoveItem(const T& /*item*/, const ui64 /*new_index*/) const {
}
};

template <typename T, typename ErasePolicy = TDefaultErasePolicy<T>>
void EraseItems(std::vector<T>& container, const std::vector<ui32>& idxsToErase, const ErasePolicy& policy = TDefaultErasePolicy<T>()) {
if (idxsToErase.empty()) {
return;
}
AFL_VERIFY(idxsToErase.front() < container.size());

auto itNextEraseIdx = idxsToErase.begin();
ui64 writeIdx = idxsToErase.front();
ui64 readIdx = idxsToErase.front();
while (readIdx != container.size()) {
AFL_VERIFY(itNextEraseIdx != idxsToErase.end() && readIdx == *itNextEraseIdx);

policy.OnEraseItem(container[readIdx]);
++readIdx;
++itNextEraseIdx;
if (itNextEraseIdx != idxsToErase.end()) {
AFL_VERIFY(*itNextEraseIdx > *std::prev(itNextEraseIdx));
AFL_VERIFY(*itNextEraseIdx < container.size());
}

const ui64 nextReadIdx = itNextEraseIdx == idxsToErase.end() ? container.size() : *itNextEraseIdx;
while (readIdx != nextReadIdx) {
std::swap(container[writeIdx], container[readIdx]);
policy.OnMoveItem(container[writeIdx], writeIdx);
++writeIdx;
++readIdx;
}
}

container.resize(writeIdx);
AFL_VERIFY(itNextEraseIdx == idxsToErase.end());
}

} // namespace NKikimr::NArrow::NUtil
7 changes: 7 additions & 0 deletions ydb/core/formats/arrow/modifier/schema.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "schema.h"
#include <util/string/builder.h>
#include <ydb/core/formats/arrow/common/vector_operations.h>
#include <ydb/library/actors/core/log.h>

namespace NKikimr::NArrow::NModifier {
Expand Down Expand Up @@ -29,6 +30,12 @@ TConclusionStatus TSchema::AddField(const std::shared_ptr<arrow::Field>& f) {
return TConclusionStatus::Success();
}

void TSchema::DeleteFieldsByIndex(const std::vector<ui32>& idxs) {
AFL_VERIFY(Initialized);
AFL_VERIFY(!Finished);
NUtil::EraseItems(Fields, idxs, TFieldsErasePolicy(this));
}

TString TSchema::ToString() const {
TStringBuilder result;
for (auto&& i : Fields) {
Expand Down
23 changes: 23 additions & 0 deletions ydb/core/formats/arrow/modifier/schema.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <ydb/library/actors/core/log.h>
#include <ydb/library/conclusion/status.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
#include <util/generic/hash.h>
Expand Down Expand Up @@ -39,6 +40,7 @@ class TSchema {
std::shared_ptr<arrow::Schema> Finish();
[[nodiscard]] TConclusionStatus AddField(const std::shared_ptr<arrow::Field>& f);
const std::shared_ptr<arrow::Field>& GetFieldByName(const std::string& name) const;
void DeleteFieldsByIndex(const std::vector<ui32>& idxs);

bool HasField(const std::string& name) const {
return IndexByName.contains(name);
Expand All @@ -51,5 +53,26 @@ class TSchema {
const std::shared_ptr<arrow::Field>& GetFieldVerified(const ui32 index) const;

const std::shared_ptr<arrow::Field>& field(const ui32 index) const;

private:
class TFieldsErasePolicy {
private:
TSchema* const Owner;

public:
TFieldsErasePolicy(TSchema* const owner)
: Owner(owner) {
}

void OnEraseItem(const std::shared_ptr<arrow::Field>& item) const {
Owner->IndexByName.erase(item->name());
}

void OnMoveItem(const std::shared_ptr<arrow::Field>& item, const ui64 new_index) const {
auto* findField = Owner->IndexByName.FindPtr(item->name());
AFL_VERIFY(findField);
*findField = new_index;
}
};
};
}
46 changes: 34 additions & 12 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7849,7 +7849,11 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
}

Y_UNIT_TEST(DropThenAddColumn) {
void TestDropThenAddColumn(bool enableIndexation, bool enableCompaction) {
if (enableCompaction) {
Y_ABORT_UNLESS(enableIndexation);
}

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->DisableBackground(NYDBTest::ICSController::EBackground::Compaction);
Expand All @@ -7874,12 +7878,14 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.BulkUpsert(testTable, tableInserter);
}

csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->EnableBackground(NYDBTest::ICSController::EBackground::Compaction);
csController->WaitIndexation(TDuration::Seconds(5));
csController->WaitCompactions(TDuration::Seconds(5));
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->DisableBackground(NYDBTest::ICSController::EBackground::Compaction);
if (enableCompaction) {
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->EnableBackground(NYDBTest::ICSController::EBackground::Compaction);
csController->WaitIndexation(TDuration::Seconds(5));
csController->WaitCompactions(TDuration::Seconds(5));
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->DisableBackground(NYDBTest::ICSController::EBackground::Compaction);
}

{
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` DROP COLUMN value;";
Expand All @@ -7900,12 +7906,28 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.BulkUpsert(testTable, tableInserter);
}

csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->EnableBackground(NYDBTest::ICSController::EBackground::Compaction);
csController->WaitIndexation(TDuration::Seconds(5));
csController->WaitCompactions(TDuration::Seconds(5));
if (enableIndexation) {
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
}
if (enableCompaction) {
csController->EnableBackground(NYDBTest::ICSController::EBackground::Compaction);
csController->WaitCompactions(TDuration::Seconds(5));
}

testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest`", "[[#];[#];[[42u]];[[43u]]]");
}

Y_UNIT_TEST(DropThenAddColumn) {
TestDropThenAddColumn(false, false);
}

Y_UNIT_TEST(DropThenAddColumnIndexation) {
TestDropThenAddColumn(true, true);
}

testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest`", "[[4;#;[\"test_res_1\"]]]");
Y_UNIT_TEST(DropThenAddColumnCompaction) {
TestDropThenAddColumn(true, true);
}

Y_UNIT_TEST(DropTtlColumn) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
auto batchSchema =
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema()->fields()));
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
blobSchema->AdaptBatchToSchema(*batch, resultSchema);
}
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,21 @@ bool TCommittedDataSource::DoStartFetchingColumns(

void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) {
TMemoryProfileGuard mGuard("SCAN_PROFILE::ASSEMBLER::COMMITTED", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
const ISnapshotSchema::TPtr batchSchema = GetContext()->GetReadMetadata()->GetIndexVersions().GetSchemaVerified(GetCommitted().GetSchemaVersion());
const ISnapshotSchema::TPtr resultSchema = GetContext()->GetReadMetadata()->GetResultSchema();
if (!GetStageData().GetTable()) {
AFL_VERIFY(GetStageData().GetBlobs().size() == 1);
auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion());
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema->fields())));
AFL_VERIFY(rBatch)("schema", schema->ToString());
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
batchSchema->AdaptBatchToSchema(*batch, resultSchema);
GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, CommittedBlob.GetSnapshotDef(TSnapshot::Zero()));
GetContext()->GetReadMetadata()->GetIndexInfo().AddDeleteFlagsColumn(*batch, CommittedBlob.GetIsDelete());
MutableStageData().AddBatch(batch);
}
MutableStageData().SyncTableColumns(columns->GetSchema()->fields(), *GetContext()->GetReadMetadata()->GetResultSchema());
MutableStageData().SyncTableColumns(columns->GetSchema()->fields(), *resultSchema);
}

} // namespace NKikimr::NOlap::NReader::NPlain
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
}
}

void ISnapshotSchema::AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const {
if (targetSchema->GetVersion() != GetVersion()) {
std::vector<ui32> columnIdxToDelete;
for (size_t columnIdx = 0; columnIdx < batch.GetSchema()->GetFields().size(); ++columnIdx) {
const std::optional<ui32> targetColumnId = targetSchema->GetColumnIdOptional(batch.GetSchema()->field(columnIdx)->name());
const ui32 batchColumnId = GetColumnIdVerified(GetFieldByIndex(columnIdx)->name());
if (!targetColumnId || *targetColumnId != batchColumnId) {
columnIdxToDelete.emplace_back(columnIdx);
}
}
if (!columnIdxToDelete.empty()) {
batch.DeleteFieldsByIndex(columnIdxToDelete);
}
}
}

ui32 ISnapshotSchema::GetColumnId(const std::string& columnName) const {
auto id = GetColumnIdOptional(columnName);
AFL_VERIFY(id)("column_name", columnName)("schema", JoinSeq(",", GetSchema()->field_names()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ISnapshotSchema {
std::vector<std::string> GetPKColumnNames() const;

virtual std::optional<ui32> GetColumnIdOptional(const std::string& columnName) const = 0;
virtual ui32 GetColumnIdVerified(const std::string& columnName) const = 0;
virtual int GetFieldIndex(const ui32 columnId) const = 0;
bool HasColumnId(const ui32 columnId) const {
return GetFieldIndex(columnId) >= 0;
Expand Down Expand Up @@ -76,6 +77,7 @@ class ISnapshotSchema {
const ISnapshotSchema& dataSchema, const std::shared_ptr<NArrow::TGeneralContainer>& batch, const std::set<ui32>& restoreColumnIds) const;
[[nodiscard]] TConclusion<std::shared_ptr<arrow::RecordBatch>> PrepareForModification(
const std::shared_ptr<arrow::RecordBatch>& incomingBatch, const NEvWrite::EModificationType mType) const;
void AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const;
};

} // namespace NKikimr::NOlap
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ std::optional<ui32> TFilteredSnapshotSchema::GetColumnIdOptional(const std::stri
return result;
}

ui32 TFilteredSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
auto result = OriginalSnapshot->GetColumnIdVerified(columnName);
AFL_VERIFY(ColumnIds.contains(result));
return result;
}

int TFilteredSnapshotSchema::GetFieldIndex(const ui32 columnId) const {
if (!ColumnIds.contains(columnId)) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TFilteredSnapshotSchema: public ISnapshotSchema {
TColumnSaver GetColumnSaver(const ui32 columnId) const override;
std::shared_ptr<TColumnLoader> GetColumnLoaderOptional(const ui32 columnId) const override;
std::optional<ui32> GetColumnIdOptional(const std::string& columnName) const override;
ui32 GetColumnIdVerified(const std::string& columnName) const override;
int GetFieldIndex(const ui32 columnId) const override;

const std::shared_ptr<arrow::Schema>& GetSchema() const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ std::optional<ui32> TSnapshotSchema::GetColumnIdOptional(const std::string& colu
return IndexInfo.GetColumnIdOptional(columnName);
}

ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
return IndexInfo.GetColumnIdVerified(columnName);
}

int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const {
const TString& columnName = IndexInfo.GetColumnName(columnId, false);
if (!columnName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class TSnapshotSchema: public ISnapshotSchema {
TColumnSaver GetColumnSaver(const ui32 columnId) const override;
std::shared_ptr<TColumnLoader> GetColumnLoaderOptional(const ui32 columnId) const override;
std::optional<ui32> GetColumnIdOptional(const std::string& columnName) const override;
ui32 GetColumnIdVerified(const std::string& columnName) const override;
int GetFieldIndex(const ui32 columnId) const override;

const std::shared_ptr<arrow::Schema>& GetSchema() const override;
Expand Down

0 comments on commit 65e10f5

Please sign in to comment.