From d3cff2e9bb86dddf49389457adcd172449934c2c Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Tue, 1 Oct 2024 15:22:44 +0000 Subject: [PATCH] Fix #9889 and add tests --- .github/config/muted_ya.txt | 1 - ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 141 +++++++++++++++++- .../engines/changes/indexation.cpp | 12 +- .../reader/plain_reader/iterator/source.cpp | 5 +- .../scheme/versions/abstract_scheme.cpp | 30 ++-- .../engines/scheme/versions/abstract_scheme.h | 3 +- 6 files changed, 174 insertions(+), 18 deletions(-) diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 6a0ed1ba02c2..0885992995c5 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -19,7 +19,6 @@ ydb/core/kqp/ut/query KqpQuery.QueryTimeout ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage ydb/core/kqp/ut/scheme [*/*]* ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns -ydb/core/kqp/ut/scheme KqpOlapScheme.DropColumnAfterInsert ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter ydb/core/kqp/ut/service [*/*]* diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 3362c381db33..7d688c515ee7 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -8354,13 +8354,152 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.BulkUpsert(testTable, batch); auto alterQueryAdd = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` DROP COLUMN int_column;"; - Cerr << alterQueryAdd << Endl; auto alterAddResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryAdd).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterAddResult.GetStatus(), EStatus::SUCCESS, alterAddResult.GetIssues().ToString()); csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation); csController->WaitIndexation(TDuration::Seconds(5)); } + + void TestInsertAddInsertDrop( + bool autoIndexation, bool indexationAfterInsertAddColumn, bool indexationAfterInsertDropColumn, bool indexationInEnd) { + using namespace NArrow; + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + if (!autoIndexation) { + csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation); + } + + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(runnerSettings); + + TVector schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("int_column").SetType(NScheme::NTypeIds::Int32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSchema(schema); + testHelper.CreateTable(testTable); + + TVector dataBuilders; + dataBuilders.push_back( + NConstruction::TSimpleArrayConstructor>::BuildNotNullable("id", false)); + dataBuilders.push_back( + std::make_shared>>("int_column")); + auto batch = NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100); + + for (ui32 i = 0; i < 5; i++) { + testHelper.BulkUpsert(testTable, batch); + auto alterQueryAdd = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN column" << i << " Uint64;"; + auto alterAddResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryAdd).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterAddResult.GetStatus(), EStatus::SUCCESS, alterAddResult.GetIssues().ToString()); + + if (!autoIndexation && indexationAfterInsertAddColumn) { + csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation); + csController->WaitIndexation(TDuration::Seconds(5)); + csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation); + } + + testHelper.BulkUpsert(testTable, batch); + auto alterQueryDrop = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` DROP COLUMN column" << i << ";"; + auto alterDropResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryDrop).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterDropResult.GetStatus(), EStatus::SUCCESS, alterDropResult.GetIssues().ToString()); + + if (!autoIndexation && indexationAfterInsertDropColumn) { + csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation); + csController->WaitIndexation(TDuration::Seconds(5)); + csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation); + } + } + + if (!autoIndexation && indexationInEnd) { + csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation); + csController->WaitIndexation(TDuration::Seconds(5)); + } + } + + Y_UNIT_TEST(InsertAddInsertDrop) { + TestInsertAddInsertDrop(true, false, false, false); + for (i32 i = 0; i < 8; i++) { + TestInsertAddInsertDrop(false, i & 1, i & 2, i & 3); + } + } + + Y_UNIT_TEST(DropTableAfterInsert) { + using namespace NArrow; + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation); + + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(runnerSettings); + + TVector schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("int_column").SetType(NScheme::NTypeIds::Int32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSchema(schema); + testHelper.CreateTable(testTable); + + TVector dataBuilders; + dataBuilders.push_back( + NConstruction::TSimpleArrayConstructor>::BuildNotNullable("id", false)); + dataBuilders.push_back( + std::make_shared>>("int_column")); + auto batch = NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100); + + testHelper.BulkUpsert(testTable, batch); + + auto alterQueryDrop = TStringBuilder() << "DROP TABLE `" << testTable.GetName() << "`;"; + auto alterDropResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryDrop).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterDropResult.GetStatus(), EStatus::SUCCESS, alterDropResult.GetIssues().ToString()); + + csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation); + csController->WaitIndexation(TDuration::Seconds(5)); + } + + Y_UNIT_TEST(InsertDropAddColumn) { + using namespace NArrow; + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation); + + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(runnerSettings); + + TVector schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("int_column").SetType(NScheme::NTypeIds::Int32).SetNullable(true) + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSchema(schema); + testHelper.CreateTable(testTable); + + TVector dataBuilders; + dataBuilders.push_back( + NConstruction::TSimpleArrayConstructor>::BuildNotNullable("id", false)); + dataBuilders.push_back( + std::make_shared>>("int_column")); + auto batch = NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100); + + testHelper.BulkUpsert(testTable, batch); + + auto alterQueryDrop = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` DROP COLUMN int_column;"; + auto alterDropResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryDrop).GetValueSync(); + + auto alterQueryAdd = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN int_column Int32;"; + auto alterAddResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryAdd).GetValueSync(); + + csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation); + csController->WaitIndexation(TDuration::Seconds(5)); + } } Y_UNIT_TEST_SUITE(KqpOlapTypes) { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index edce92470ad9..287e1a26508b 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -104,6 +104,7 @@ class TPathFieldsInfo { return; } auto blobSchema = context.SchemaVersions.GetSchemaVerified(data.GetSchemaVersion()); + std::set columnIdxsToDelete = blobSchema->GetColumnIdxsToDelete(ResultSchema); if (!Schemas.contains(data.GetSchemaVersion())) { Schemas.emplace(data.GetSchemaVersion(), blobSchema); } @@ -111,7 +112,11 @@ class TPathFieldsInfo { if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) { filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); } - UsageColumnIds.insert(filteredIds.begin(), filteredIds.end()); + for (const auto& filteredId : filteredIds) { + if (!columnIdxsToDelete.contains(filteredId)) { + UsageColumnIds.insert(filteredId); + } + } } }; @@ -242,7 +247,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont auto batchSchema = std::make_shared(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema()->fields())); batch = std::make_shared(NArrow::DeserializeBatch(blobData, batchSchema)); - blobSchema->AdaptBatchToSchema(*batch, resultSchema); + std::set columnIdxToDelete = blobSchema->GetColumnIdxsToDelete(resultSchema); + if (!columnIdxToDelete.empty()) { + batch->DeleteFieldsByIndex(blobSchema->ConvertColumnIdxsToIndexes(columnIdxToDelete)); + } } IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index 5e4d80fbfe43..9e3f80a51513 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -224,7 +224,10 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr& auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared(CommittedBlob.GetSchemaSubset().Apply(schema->fields()))); AFL_VERIFY(rBatch)("schema", schema->ToString()); auto batch = std::make_shared(rBatch); - batchSchema->AdaptBatchToSchema(*batch, resultSchema); + std::set columnIdxsToDelete = batchSchema->GetColumnIdxsToDelete(resultSchema); + if (!columnIdxsToDelete.empty()) { + batch->DeleteFieldsByIndex(batchSchema->ConvertColumnIdxsToIndexes(columnIdxsToDelete)); + } TSnapshot ss = TSnapshot::Zero(); if (CommittedBlob.IsCommitted()) { ss = CommittedBlob.GetCommittedSnapshotVerified(); diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp index b302a847e458..3aae9b6ebea1 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp @@ -133,20 +133,26 @@ TConclusion> ISnapshotSchema::PrepareForModi return batch; } -void ISnapshotSchema::AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const { - if (targetSchema->GetVersion() != GetVersion()) { - std::vector columnIdxToDelete; - for (size_t columnIdx = 0; columnIdx < batch.GetSchema()->GetFields().size(); ++columnIdx) { - const std::optional targetColumnId = targetSchema->GetColumnIdOptional(batch.GetSchema()->field(columnIdx)->name()); - const ui32 batchColumnId = GetColumnIdVerified(GetFieldByIndex(columnIdx)->name()); - if (!targetColumnId || *targetColumnId != batchColumnId) { - columnIdxToDelete.emplace_back(columnIdx); - } - } - if (!columnIdxToDelete.empty()) { - batch.DeleteFieldsByIndex(columnIdxToDelete); +std::set ISnapshotSchema::GetColumnIdxsToDelete(const ISnapshotSchema::TPtr& targetSchema) const { + if (targetSchema->GetVersion() == GetVersion()) { + return {}; + } + std::set columnIdxsToDelete; + for (const auto& columnIdx : GetColumnIds()) { + const std::optional targetColumnId = targetSchema->GetColumnIdOptional(GetFieldByColumnIdOptional(columnIdx)->name()); + if (!targetColumnId || *targetColumnId != columnIdx) { + columnIdxsToDelete.emplace(columnIdx); } } + return columnIdxsToDelete; +} + +std::vector ISnapshotSchema::ConvertColumnIdxsToIndexes(const std::set& idxs) const { + std::vector columnIndexes; + for (const auto& id : idxs) { + columnIndexes.emplace_back(GetFieldIndex(id)); + } + return columnIndexes; } ui32 ISnapshotSchema::GetColumnId(const std::string& columnName) const { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h index 962989d75fb2..d12a15e98897 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h @@ -77,7 +77,8 @@ class ISnapshotSchema { const ISnapshotSchema& dataSchema, const std::shared_ptr& batch, const std::set& restoreColumnIds) const; [[nodiscard]] TConclusion> PrepareForModification( const std::shared_ptr& incomingBatch, const NEvWrite::EModificationType mType) const; - void AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const; + std::set GetColumnIdxsToDelete(const ISnapshotSchema::TPtr& targetSchema) const; + std::vector ConvertColumnIdxsToIndexes(const std::set& idxs) const; }; } // namespace NKikimr::NOlap