From 2707851459380959b71b14d23d5005e4d3f7b2ef Mon Sep 17 00:00:00 2001 From: Artem Alekseev Date: Tue, 5 Nov 2024 19:47:15 +0300 Subject: [PATCH] Transfer scheme history to new partitions (#9959) --- .github/config/muted_ya.txt | 1 - ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp | 77 +++++++++- ydb/core/protos/counters_columnshard.proto | 1 + ydb/core/tx/columnshard/columnshard_impl.cpp | 143 +++++++++--------- ydb/core/tx/columnshard/columnshard_impl.h | 1 + .../data_sharing/common/session/common.cpp | 14 +- .../data_sharing/common/session/common.h | 19 ++- .../destination/events/transfer.h | 26 ++-- .../destination/session/destination.cpp | 12 +- .../destination/session/destination.h | 11 +- .../transactions/tx_data_from_source.cpp | 24 ++- .../transactions/tx_data_from_source.h | 10 +- .../data_sharing/manager/sessions.cpp | 9 +- .../data_sharing/manager/sessions.h | 2 +- .../data_sharing/protos/events.proto | 2 + .../data_sharing/protos/sessions.proto | 4 + .../data_sharing/protos/transfer.proto | 1 + .../columnshard/data_sharing/protos/ya.make | 2 +- .../data_sharing/source/session/cursor.cpp | 44 +++++- .../data_sharing/source/session/cursor.h | 26 +++- .../data_sharing/source/session/source.cpp | 22 +-- .../data_sharing/source/session/source.h | 39 ++--- .../transactions/tx_data_ack_to_source.cpp | 2 + .../transactions/tx_start_source_cursor.cpp | 40 +++++ .../transactions/tx_start_source_cursor.h | 31 ++++ .../data_sharing/source/transactions/ya.make | 1 + .../tx/columnshard/engines/column_engine.h | 10 ++ .../engines/column_engine_logs.cpp | 31 ++++ .../columnshard/engines/column_engine_logs.h | 1 + .../engines/scheme/schema_version.cpp | 4 + .../engines/scheme/schema_version.h | 46 ++++++ .../engines/scheme/versions/versioned_index.h | 12 +- .../tx/columnshard/engines/scheme/ya.make | 1 + 33 files changed, 492 insertions(+), 177 deletions(-) create mode 100644 ydb/core/tx/columnshard/data_sharing/source/transactions/tx_start_source_cursor.cpp create mode 100644 ydb/core/tx/columnshard/data_sharing/source/transactions/tx_start_source_cursor.h create mode 100644 ydb/core/tx/columnshard/engines/scheme/schema_version.cpp create mode 100644 ydb/core/tx/columnshard/engines/scheme/schema_version.h diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 5ae37139dfea..426f8950b79f 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -23,7 +23,6 @@ ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertViaLegacyScripting-S ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restarts -ydb/core/kqp/ut/olap KqpOlapBlobsSharing.ChangeSchemaAndSplit ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64 ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest diff --git a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp index d4cfb8d85f10..c6a00cdde5b1 100644 --- a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp +++ b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp @@ -321,9 +321,6 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { void Execute() { TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 24, 4); - - Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize(); - { WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000); WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000); @@ -403,7 +400,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { } Y_UNIT_TEST(TableReshardingModuloN) { - TShardingTypeTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute(); + TShardingTypeTest().SetShardingType("HASH_FUNCTION_MODULO_N").Execute(); } class TAsyncReshardingTest: public TReshardingTest { @@ -435,11 +432,36 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { TReshardingTest::CheckCount(NumRows); } + void AddManyColumns() { + auto alterQuery = TStringBuilder() << "ALTER TABLESTORE `/Root/olapStore` "; + for (int i = 0; i < 10000; i++) { + alterQuery << " ADD COLUMN col_" << i << " Int8"; + if (i < 10000 - 1) { + alterQuery << ", "; + } + } + + auto session = TableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + void RestartAllShards() { + for (i64 id : CSController->GetShardActualIds()) { + Kikimr.GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward(new TEvents::TEvPoisonPill(), id, false)); + } + } + void ChangeSchema() { - auto alterQuery = - "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=level, " - "`SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, " - "`COMPRESSION.TYPE`=`zstd`);"; + const char* alterQuery; + if (HasNewCol) { + alterQuery = "ALTER TABLESTORE `/Root/olapStore` DROP COLUMN new_col"; + } else { + alterQuery = "ALTER TABLESTORE `/Root/olapStore` ADD COLUMN new_col Int8"; + } + HasNewCol = !HasNewCol; + auto session = TableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); @@ -454,6 +476,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { ui64 LastPathId = 1000000; ui64 LastTs = 300000000; ui64 NumRows = 0; + ui64 HasNewCol = false; }; Y_UNIT_TEST(UpsertWhileSplitTest) { @@ -498,6 +521,44 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { tester.StartResharding("SPLIT"); tester.WaitResharding(); + tester.RestartAllShards(); + + tester.CheckCount(); + } + + Y_UNIT_TEST(MultipleSchemaVersions) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + for (int i = 0; i < 3; i++) { + tester.AddBatch(1); + tester.ChangeSchema(); + } + + tester.StartResharding("SPLIT"); + tester.WaitResharding(); + + tester.RestartAllShards(); + + tester.CheckCount(); + } + + Y_UNIT_TEST(HugeSchemeHistory) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + tester.AddManyColumns(); + + for (int i = 0; i < 100; i++) { + tester.AddBatch(1); + tester.ChangeSchema(); + } + + tester.StartResharding("SPLIT"); + tester.WaitResharding(); + + tester.RestartAllShards(); + tester.CheckCount(); } } diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index 898dac98aad6..f0d358ae2f19 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -201,4 +201,5 @@ enum ETxTypes { TXTYPE_GC_START = 34 [(TxTypeOpts) = {Name: "TxGarbageCollectionStart"}]; TXTYPE_APPLY_NORMALIZER = 35 [(TxTypeOpts) = {Name: "TxApplyNormalizer"}]; TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}]; + TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}]; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 1615f67bb2f3..bb192aef33a2 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1,52 +1,51 @@ -#include "columnshard_impl.h" #include "blob.h" +#include "columnshard_impl.h" #include "columnshard_schema.h" -#include "common/tablet_id.h" -#include "blobs_reader/task.h" -#include "blobs_reader/events.h" + #include "blobs_action/bs/storage.h" +#include "blobs_reader/events.h" +#include "blobs_reader/task.h" +#include "common/tablet_id.h" #include "resource_subscriber/task.h" #ifndef KIKIMR_DISABLE_S3_OPS #include "blobs_action/tier/storage.h" #endif -#include "blobs_reader/actor.h" +#include "bg_tasks/adapter/adapter.h" +#include "bg_tasks/events/events.h" +#include "bg_tasks/manager/manager.h" #include "blobs_action/storages_manager/manager.h" -#include "blobs_action/transaction/tx_remove_blobs.h" -#include "blobs_action/transaction/tx_gc_insert_table.h" #include "blobs_action/transaction/tx_gc_indexed.h" -#include "bg_tasks/events/events.h" - +#include "blobs_action/transaction/tx_gc_insert_table.h" +#include "blobs_action/transaction/tx_remove_blobs.h" +#include "blobs_reader/actor.h" +#include "data_sharing/common/transactions/tx_extension.h" #include "data_sharing/destination/session/destination.h" #include "data_sharing/source/session/source.h" -#include "data_sharing/common/transactions/tx_extension.h" - -#include "engines/changes/indexation.h" -#include "engines/changes/general_compaction.h" #include "engines/changes/cleanup_portions.h" #include "engines/changes/cleanup_tables.h" +#include "engines/changes/general_compaction.h" +#include "engines/changes/indexation.h" #include "engines/changes/ttl.h" - +#include "hooks/abstract/abstract.h" #include "resource_subscriber/counters.h" #include "transactions/operators/ev_write/sync.h" -#include "bg_tasks/adapter/adapter.h" -#include "bg_tasks/manager/manager.h" -#include "hooks/abstract/abstract.h" - +#include #include #include -#include -#include #include -#include -#include +#include +#include #include #include #include #include -#include +#include +#include + +#include namespace NKikimr::NColumnShard { @@ -54,8 +53,7 @@ namespace NKikimr::NColumnShard { // But in unittests we want to test both scenarios bool gAllowLogBatchingDefaultValue = true; -namespace -{ +namespace { NTabletPipe::TClientConfig GetPipeClientConfig() { NTabletPipe::TClientConfig config; @@ -66,7 +64,7 @@ NTabletPipe::TClientConfig GetPipeClientConfig() { return config; } -} +} // namespace TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) : TActor(&TThis::StateInit) @@ -89,8 +87,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters()) , BackgroundController(Counters.GetBackgroundControllerCounters()) , NormalizerController(StoragesManager, Counters.GetSubscribeCounters()) - , SysLocks(this) -{ + , SysLocks(this) { } void TColumnShard::OnDetach(const TActorContext& ctx) { @@ -135,8 +132,7 @@ bool TColumnShard::WaitPlanStep(ui64 step) { } if (MediatorTimeCastRegistered) { if (MediatorTimeCastWaitingSteps.empty() || - step < *MediatorTimeCastWaitingSteps.begin()) - { + step < *MediatorTimeCastWaitingSteps.begin()) { MediatorTimeCastWaitingSteps.insert(step); SendWaitPlanStep(step); LOG_S_DEBUG("Waiting for PlanStep# " << step << " from mediator time cast"); @@ -307,7 +303,7 @@ void TColumnShard::UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExec } void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, - NTabletFlatExecutor::TTransactionContext& txc) { + NTabletFlatExecutor::TTransactionContext& txc) { auto seqNo = SeqNoFromProto(seqNoProto); if (LastSchemaSeqNo <= seqNo) { UpdateSchemaSeqNo(++seqNo, txc); @@ -315,7 +311,7 @@ void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& } void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const NOlap::TSnapshot& version, - NTabletFlatExecutor::TTransactionContext& txc) { + NTabletFlatExecutor::TTransactionContext& txc) { switch (body.TxBody_case()) { case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: { RunInit(body.GetInitShard(), version, txc); @@ -347,7 +343,7 @@ void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, } void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const NOlap::TSnapshot& version, - NTabletFlatExecutor::TTransactionContext& txc) { + NTabletFlatExecutor::TTransactionContext& txc) { Y_UNUSED(version); NIceDb::TNiceDb db(txc.DB); @@ -368,7 +364,7 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const } void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const NOlap::TSnapshot& version, - NTabletFlatExecutor::TTransactionContext& txc) { + NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); const ui64 pathId = tableProto.GetPathId(); @@ -378,8 +374,8 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl } LOG_S_DEBUG("EnsureTable for pathId: " << pathId - << " ttl settings: " << tableProto.GetTtlSettings() - << " at tablet " << TabletID()); + << " ttl settings: " << tableProto.GetTtlSettings() + << " at tablet " << TabletID()); NKikimrTxColumnShard::TTableVersionInfo tableVerProto; tableVerProto.SetPathId(pathId); @@ -432,16 +428,16 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl } void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const NOlap::TSnapshot& version, - NTabletFlatExecutor::TTransactionContext& txc) { + NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); const ui64 pathId = alterProto.GetPathId(); Y_ABORT_UNLESS(TablesManager.HasTable(pathId), "AlterTable on a dropped or non-existent table"); LOG_S_DEBUG("AlterTable for pathId: " << pathId - << " schema: " << alterProto.GetSchema() - << " ttl settings: " << alterProto.GetTtlSettings() - << " at tablet " << TabletID()); + << " schema: " << alterProto.GetSchema() + << " ttl settings: " << alterProto.GetTtlSettings() + << " at tablet " << TabletID()); NKikimrTxColumnShard::TTableVersionInfo tableVerProto; std::optional schema; @@ -466,7 +462,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP } void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const NOlap::TSnapshot& version, - NTabletFlatExecutor::TTransactionContext& txc) { + NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); const ui64 pathId = dropProto.GetPathId(); @@ -480,7 +476,7 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt } void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const NOlap::TSnapshot& version, - NTabletFlatExecutor::TTransactionContext& txc) { + NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); if (proto.HasStorePathId()) { @@ -514,7 +510,7 @@ void TColumnShard::EnqueueBackgroundActivities(const bool periodic) { AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("problem", "Background activities cannot be started: no index at tablet"); return; } -// !!!!!! MUST BE FIRST THROUGH DATA HAVE TO BE SAME IN SESSIONS AFTER TABLET RESTART + // !!!!!! MUST BE FIRST THROUGH DATA HAVE TO BE SAME IN SESSIONS AFTER TABLET RESTART SharingSessionsManager->Start(*this); SetupIndexation(); @@ -534,6 +530,7 @@ class TChangesTask: public NConveyor::ITask { const TActorId ParentActorId; TString ClassId; NOlap::TSnapshot LastCompletedTx; + protected: virtual TConclusionStatus DoExecute(const std::shared_ptr& /*taskPtr*/) override { NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId)); @@ -547,6 +544,7 @@ class TChangesTask: public NConveyor::ITask { TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent)); return TConclusionStatus::Success(); } + public: virtual TString GetTaskClassIdentifier() const override { return ClassId; @@ -557,8 +555,7 @@ class TChangesTask: public NConveyor::ITask { , Counters(counters) , TabletId(tabletId) , ParentActorId(parentActorId) - , LastCompletedTx(lastCompletedTx) - { + , LastCompletedTx(lastCompletedTx) { Y_ABORT_UNLESS(TxEvent); Y_ABORT_UNLESS(TxEvent->IndexChanges); ClassId = "Changes::ConstructBlobs::" + TxEvent->IndexChanges->TypeString(); @@ -573,6 +570,7 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { std::unique_ptr TxEvent; TIndexationCounters Counters; NOlap::TSnapshot LastCompletedTx; + protected: virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override { TxEvent->IndexChanges->Blobs = ExtractBlobsData(); @@ -587,14 +585,13 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { } virtual bool DoOnError(const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)("status", status.GetErrorMessage())("status_code", status.GetStatus()); - AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus()) - ("error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier()) - ("debug", TxEvent->IndexChanges->DebugString()); + AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())("error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier())("debug", TxEvent->IndexChanges->DebugString()); TxEvent->SetPutStatus(NKikimrProto::ERROR); Counters.ReadErrors->Add(1); TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent)); return false; } + public: TChangesReadTask(std::unique_ptr&& event, const TActorId parentActorId, const ui64 tabletId, const TIndexationCounters& counters, NOlap::TSnapshot lastCompletedTx) : TBase(event->IndexChanges->GetReadingActions(), event->IndexChanges->TypeString(), event->IndexChanges->GetTaskIdentifier()) @@ -602,8 +599,7 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { , TabletId(tabletId) , TxEvent(std::move(event)) , Counters(counters) - , LastCompletedTx(lastCompletedTx) - { + , LastCompletedTx(lastCompletedTx) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start_changes")("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier()); } }; @@ -611,6 +607,7 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { class TInsertChangesReadTask: public TChangesReadTask, public TMonitoringObjectsCounter { private: using TBase = TChangesReadTask; + public: using TBase::TBase; }; @@ -618,6 +615,7 @@ class TInsertChangesReadTask: public TChangesReadTask, public TMonitoringObjects class TCompactChangesReadTask: public TChangesReadTask, public TMonitoringObjectsCounter { private: using TBase = TChangesReadTask; + public: using TBase::TBase; }; @@ -625,6 +623,7 @@ class TCompactChangesReadTask: public TChangesReadTask, public TMonitoringObject class TTTLChangesReadTask: public TChangesReadTask, public TMonitoringObjectsCounter { private: using TBase = TChangesReadTask; + public: using TBase::TBase; }; @@ -650,13 +649,12 @@ void TColumnShard::StartIndexTask(std::vector&& da auto ev = std::make_unique(actualIndexInfo, indexChanges, Settings.CacheDataAfterIndexing); const TString externalTaskId = indexChanges->GetTaskIdentifier(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes) - ("has_more", bytesToIndex >= Limits.MaxInsertBytes)("external_task_id", externalTaskId); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes)("has_more", bytesToIndex >= Limits.MaxInsertBytes)("external_task_id", externalTaskId); NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( ResourceSubscribeActor, std::make_shared( - std::make_shared(std::move(ev), SelfId(), TabletID(), Counters.GetIndexationCounters(), GetLastCompletedTx()), - 0, indexChanges->CalcMemoryForUsage(), externalTaskId, InsertTaskSubscription)); + std::make_shared(std::move(ev), SelfId(), TabletID(), Counters.GetIndexationCounters(), GetLastCompletedTx()), + 0, indexChanges->CalcMemoryForUsage(), externalTaskId, InsertTaskSubscription)); } void TColumnShard::SetupIndexation() { @@ -666,9 +664,7 @@ void TColumnShard::SetupIndexation() { } BackgroundController.CheckDeadlinesIndexation(); if (BackgroundController.GetIndexingActiveCount()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress") - ("count", BackgroundController.GetIndexingActiveCount())("insert_overload_size", InsertTable->GetCountersCommitted().Bytes) - ("indexing_debug", BackgroundController.DebugStringIndexation()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress")("count", BackgroundController.GetIndexingActiveCount())("insert_overload_size", InsertTable->GetCountersCommitted().Bytes)("indexing_debug", BackgroundController.DebugStringIndexation()); return; } @@ -680,8 +676,7 @@ void TColumnShard::SetupIndexation() { const TDuration durationLimit = NYDBTest::TControllers::GetColumnShardController()->GetGuaranteeIndexationInterval(); if (!force && InsertTable->GetCountersCommitted().Bytes < bytesLimit && TMonotonic::Now() < BackgroundController.GetLastIndexationInstant() + durationLimit) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "not_enough_data_and_too_frequency") - ("insert_size", InsertTable->GetCountersCommitted().Bytes); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "not_enough_data_and_too_frequency")("insert_size", InsertTable->GetCountersCommitted().Bytes); return; } @@ -722,12 +717,10 @@ class TCompactionAllocated: public NPrioritiesQueue::IRequest { public: TCompactionAllocated(const NActors::TActorId& tabletActorId) - : TabletActorId(tabletActorId) - { - + : TabletActorId(tabletActorId) { } }; -} // namespace +} // namespace void TColumnShard::SetupCompaction(const std::set& pathIds) { if (!AppDataVerified().ColumnShardConfig.GetCompactionEnabled() || @@ -808,8 +801,8 @@ bool TColumnShard::SetupTtl(const THashMap& pathTtls) { if (needWrites) { NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( ResourceSubscribeActor, std::make_shared( - std::make_shared(std::move(ev), SelfId(), TabletID(), Counters.GetCompactionCounters(), GetLastCompletedTx()), - 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription)); + std::make_shared(std::move(ev), SelfId(), TabletID(), Counters.GetCompactionCounters(), GetLastCompletedTx()), + 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription)); } else { ev->SetPutStatus(NKikimrProto::OK); ActorContext().Send(SelfId(), std::move(ev)); @@ -1003,7 +996,6 @@ void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvConfirmFromInitiator: if (currentSession->IsConfirmed()) { currentSession->GetInitiatorController().ConfirmSuccess(ev->Get()->Record.GetSessionId()); } else { - auto txConclusion = SharingSessionsManager->ConfirmDestSession(this, currentSession); Execute(txConclusion.release(), ctx); } @@ -1032,17 +1024,26 @@ void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvSendDataFromSource::T return; } + // in current implementation the next loop will crash if schemas will be sent in the same package with the data, so adding this verify to ensure consistent behaviour + AFL_VERIFY(ev->Get()->Record.GetPathIdData().empty() || ev->Get()->Record.GetSchemeHistory().empty())("reason", "can not send schemas and data in the same package"); + THashMap dataByPathId; TBlobGroupSelector dsGroupSelector(Info()); for (auto&& i : ev->Get()->Record.GetPathIdData()) { - auto schema = TablesManager.GetPrimaryIndexAsVerified().GetVersionedIndex().GetLastSchema(); - AFL_VERIFY(schema); - auto data = NOlap::NDataSharing::NEvents::TPathIdData::BuildFromProto(i, schema->GetIndexInfo(), dsGroupSelector); + auto data = NOlap::NDataSharing::NEvents::TPathIdData::BuildFromProto(i, TablesManager.GetPrimaryIndexAsVerified().GetVersionedIndex(), dsGroupSelector); AFL_VERIFY(data.IsSuccess())("error", data.GetErrorMessage()); AFL_VERIFY(dataByPathId.emplace(i.GetPathId(), data.DetachResult()).second); } - auto txConclusion = currentSession->ReceiveData(this, dataByPathId, ev->Get()->Record.GetPackIdx(), (NOlap::TTabletId)ev->Get()->Record.GetSourceTabletId(), currentSession); + const auto& schemeHistoryProto = ev->Get()->Record.GetSchemeHistory(); + + std::vector schemas; + + for (auto&& i : schemeHistoryProto) { + schemas.emplace_back(i); + } + + auto txConclusion = currentSession->ReceiveData(this, std::move(dataByPathId), std::move(schemas), ev->Get()->Record.GetPackIdx(), (NOlap::TTabletId)ev->Get()->Record.GetSourceTabletId(), currentSession); if (!txConclusion) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_received_data"); } else { @@ -1227,4 +1228,4 @@ TDuration TColumnShard::GetMaxReadStaleness() { return NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(); } -} +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 74cd2fd8796b..3a082f1def96 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -177,6 +177,7 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa friend class NOlap::NDataSharing::TTxDataAckToSource; friend class NOlap::NDataSharing::TTxFinishAckToSource; friend class NOlap::NDataSharing::TTxFinishAckFromInitiator; + friend class NOlap::NDataSharing::TSourceSession; friend class NOlap::TStoragesManager; diff --git a/ydb/core/tx/columnshard/data_sharing/common/session/common.cpp b/ydb/core/tx/columnshard/data_sharing/common/session/common.cpp index f67244543a9d..0a065a29ce70 100644 --- a/ydb/core/tx/columnshard/data_sharing/common/session/common.cpp +++ b/ydb/core/tx/columnshard/data_sharing/common/session/common.cpp @@ -12,7 +12,7 @@ TString TCommonSession::DebugString() const { return TStringBuilder() << "{id=" << SessionId << ";context=" << TransferContext.DebugString() << ";state=" << State << ";}"; } -bool TCommonSession::TryStart(const NColumnShard::TColumnShard& shard) { +TConclusionStatus TCommonSession::TryStart(NColumnShard::TColumnShard& shard) { const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("info", Info); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("info", "Start"); AFL_VERIFY(State == EState::Prepared); @@ -25,19 +25,21 @@ bool TCommonSession::TryStart(const NColumnShard::TColumnShard& shard) { const auto& g = index.GetGranuleVerified(i); for (auto&& p : g.GetPortionsOlderThenSnapshot(GetSnapshotBarrier())) { if (shard.GetDataLocksManager()->IsLocked(*p.second, { "sharing_session:" + GetSessionId() })) { - return false; + return TConclusionStatus::Fail("failed to start cursor: portion is locked"); } portionsByPath[i].emplace_back(p.second); } } if (shard.GetStoragesManager()->GetSharedBlobsManager()->HasExternalModifications()) { - return false; + return TConclusionStatus::Fail("failed to start cursor: has external modifications"); } - AFL_VERIFY(DoStart(shard, portionsByPath)); - State = EState::InProgress; - return true; + TConclusionStatus status = DoStart(shard, std::move(portionsByPath)); + if (status.Ok()) { + State = EState::InProgress; + } + return status; } void TCommonSession::PrepareToStart(const NColumnShard::TColumnShard& shard) { diff --git a/ydb/core/tx/columnshard/data_sharing/common/session/common.h b/ydb/core/tx/columnshard/data_sharing/common/session/common.h index d490babd46ff..9e78d6f4288b 100644 --- a/ydb/core/tx/columnshard/data_sharing/common/session/common.h +++ b/ydb/core/tx/columnshard/data_sharing/common/session/common.h @@ -1,8 +1,8 @@ #pragma once +#include #include -#include #include -#include +#include #include #include @@ -17,7 +17,7 @@ class TPortionDataAccessor; namespace NDataLocks { class TManager; } -} +} // namespace NKikimr::NOlap namespace NKikimr::NOlap::NDataSharing { @@ -41,17 +41,17 @@ class TCommonSession { YDB_READONLY(ui64, RuntimeId, GetNextRuntimeId()); std::shared_ptr LockGuard; EState State = EState::Created; + protected: TTransferContext TransferContext; - virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap>& portions) = 0; + virtual TConclusionStatus DoStart(NColumnShard::TColumnShard& shard, THashMap>&& portions) = 0; virtual THashSet GetPathIdsForStart() const = 0; + public: virtual ~TCommonSession() = default; TCommonSession(const TString& info) - : Info(info) - { - + : Info(info) { } TCommonSession(const TString& sessionId, const TString& info, const TTransferContext& transferContext) @@ -86,7 +86,7 @@ class TCommonSession { } void PrepareToStart(const NColumnShard::TColumnShard& shard); - bool TryStart(const NColumnShard::TColumnShard& shard); + TConclusionStatus TryStart(NColumnShard::TColumnShard& shard); void Finish(const NColumnShard::TColumnShard& shard, const std::shared_ptr& dataLocksManager); const TSnapshot& GetSnapshotBarrier() const { @@ -121,7 +121,6 @@ class TCommonSession { } return TConclusionStatus::Success(); } - }; -} \ No newline at end of file +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/destination/events/transfer.h b/ydb/core/tx/columnshard/data_sharing/destination/events/transfer.h index 0b3401d15270..1ec528e9e436 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/events/transfer.h +++ b/ydb/core/tx/columnshard/data_sharing/destination/events/transfer.h @@ -4,17 +4,14 @@ #include #include #include +#include +#include #include - -namespace NKikimr::NOlap { -class TVersionedIndex; -} - namespace NKikimr::NOlap::NDataSharing { class TSharedBlobsManager; class TTaskForTablet; -} // namespace NKikimr::NOlap::NDataSharing +} // namespace NKikimr::NOlap::NDataSharing namespace NKikimr::NOlap::NDataSharing::NEvents { @@ -26,13 +23,14 @@ class TPathIdData { TPathIdData() = default; TConclusionStatus DeserializeFromProto( - const NKikimrColumnShardDataSharingProto::TPathIdData& proto, const TIndexInfo& indexInfo, const IBlobGroupSelector& groupSelector) { + const NKikimrColumnShardDataSharingProto::TPathIdData& proto, const TVersionedIndex& versionedIndex, const IBlobGroupSelector& groupSelector) { if (!proto.HasPathId()) { return TConclusionStatus::Fail("no path id in proto"); } PathId = proto.GetPathId(); for (auto&& portionProto : proto.GetPortions()) { - TConclusion portion = TPortionDataAccessor::BuildFromProto(portionProto, indexInfo, groupSelector); + const auto schema = versionedIndex.GetSchemaVerified(portionProto.GetSchemaVersion()); + TConclusion portion = TPortionDataAccessor::BuildFromProto(portionProto, schema->GetIndexInfo(), groupSelector); if (!portion) { return portion.GetError(); } @@ -71,9 +69,9 @@ class TPathIdData { }; static TConclusion BuildFromProto( - const NKikimrColumnShardDataSharingProto::TPathIdData& proto, const TIndexInfo& indexInfo, const IBlobGroupSelector& groupSelector) { + const NKikimrColumnShardDataSharingProto::TPathIdData& proto, const TVersionedIndex& versionedIndex, const IBlobGroupSelector& groupSelector) { TPathIdData result; - auto resultParsing = result.DeserializeFromProto(proto, indexInfo, groupSelector); + auto resultParsing = result.DeserializeFromProto(proto, versionedIndex, groupSelector); if (!resultParsing) { return resultParsing; } else { @@ -87,13 +85,17 @@ struct TEvSendDataFromSource: public NActors::TEventPB& pathIdData) { + const TString& sessionId, const ui32 packIdx, const TTabletId sourceTabletId, const THashMap& pathIdData, TArrayRef schemas) { Record.SetSessionId(sessionId); Record.SetPackIdx(packIdx); Record.SetSourceTabletId((ui64)sourceTabletId); for (auto&& i : pathIdData) { i.second.SerializeToProto(*Record.AddPathIdData()); } + + for (auto&& i : schemas) { + *Record.AddSchemeHistory() = i.GetProto(); + } } }; @@ -107,4 +109,4 @@ struct TEvFinishedFromSource: public NActors::TEventPB> TDestinationSession::ReceiveData(NColumnShard::TColumnShard* self, - const THashMap& data, const ui32 receivedPackIdx, const TTabletId sourceTabletId, + THashMap&& data, std::vector&& schemas, const ui32 receivedPackIdx, const TTabletId sourceTabletId, const std::shared_ptr& selfPtr) { auto result = GetCursorVerified(sourceTabletId).ReceiveData(receivedPackIdx); if (!result) { return result; } - return std::unique_ptr(new TTxDataFromSource(self, selfPtr, data, sourceTabletId)); + return std::unique_ptr(new TTxDataFromSource(self, selfPtr, std::move(data), std::move(schemas), sourceTabletId)); } NKikimr::TConclusion> TDestinationSession::ReceiveFinished( @@ -160,8 +160,8 @@ NKikimr::TConclusionStatus TDestinationSession::DeserializeCursorFromProto( return TConclusionStatus::Success(); } -bool TDestinationSession::DoStart( - const NColumnShard::TColumnShard& shard, const THashMap>& portions) { +TConclusionStatus TDestinationSession::DoStart( + NColumnShard::TColumnShard& shard, THashMap>&& portions) { AFL_VERIFY(IsConfirmed()); NYDBTest::TControllers::GetColumnShardController()->OnDataSharingStarted(shard.TabletID(), GetSessionId()); THashMap> local; @@ -172,7 +172,7 @@ bool TDestinationSession::DoStart( } std::swap(CurrentBlobIds, local); SendCurrentCursorAck(shard, {}); - return true; + return TConclusionStatus::Success(); } bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionDataAccessor& portion) { @@ -194,4 +194,4 @@ bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, con return newCounter; } -} // namespace NKikimr::NOlap::NDataSharing +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.h b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.h index b96996a5a7f1..7b47a013b008 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.h +++ b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -15,7 +16,7 @@ class TColumnShard; namespace NKikimr::NOlap { class TColumnEngineForLogs; class IStoragesManager; -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap namespace NKikimr::NOlap::NDataSharing { @@ -78,7 +79,7 @@ class TDestinationSession: public TCommonSession { THashMap> CurrentBlobIds; protected: - virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap>& portions) override; + virtual TConclusionStatus DoStart(NColumnShard::TColumnShard& shard, THashMap>&& portions) override; virtual THashSet GetPathIdsForStart() const override { THashSet result; for (auto&& i : PathIds) { @@ -122,8 +123,8 @@ class TDestinationSession: public TCommonSession { [[nodiscard]] TConclusion> AckInitiatorFinished(NColumnShard::TColumnShard* self, const std::shared_ptr& selfPtr); - [[nodiscard]] TConclusion> ReceiveData(NColumnShard::TColumnShard* self, const THashMap& data, - const ui32 receivedPackIdx, const TTabletId sourceTabletId, const std::shared_ptr& selfPtr); + [[nodiscard]] TConclusion> ReceiveData(NColumnShard::TColumnShard* self, THashMap&& data, + std::vector&& schemas, const ui32 receivedPackIdx, const TTabletId sourceTabletId, const std::shared_ptr& selfPtr); NKikimrColumnShardDataSharingProto::TDestinationSession::TFullCursor SerializeCursorToProto() const; [[nodiscard]] TConclusionStatus DeserializeCursorFromProto(const NKikimrColumnShardDataSharingProto::TDestinationSession::TFullCursor& proto); @@ -131,4 +132,4 @@ class TDestinationSession: public TCommonSession { [[nodiscard]] TConclusionStatus DeserializeDataFromProto(const NKikimrColumnShardDataSharingProto::TDestinationSession& proto, const TColumnEngineForLogs& index); }; -} // namespace NKikimr::NOlap::NDataSharing +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp index ca5bb8851b78..d8f33c2906e1 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp @@ -6,8 +6,20 @@ namespace NKikimr::NOlap::NDataSharing { bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { using namespace NKikimr::NColumnShard; - TDbWrapper dbWrapper(txc.DB, nullptr); + + NIceDb::TNiceDb db(txc.DB); + for (auto info : SchemeHistory) { + info.SaveToLocalDb(db); + } + auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified(); + + for (auto& info : SchemeHistory) { + index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetSchema()); + } + + TDbWrapper dbWrapper(txc.DB, nullptr); + { ui64* lastPortionPtr = index.GetLastPortionPointer(); for (auto&& i : PortionsByPathId) { @@ -24,7 +36,6 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, p.SaveToDatabase(dbWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false); } } - NIceDb::TNiceDb db(txc.DB); db.Table().Key(Session->GetSessionId()) .Update(NIceDb::TUpdate(Session->SerializeCursorToProto().SerializeAsString())); return true; @@ -35,12 +46,12 @@ void TTxDataFromSource::DoComplete(const TActorContext& /*ctx*/) { Session->SendCurrentCursorAck(*Self, SourceTabletId); } -TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std::shared_ptr& session, const THashMap& portionsByPathId, const TTabletId sourceTabletId) +TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std::shared_ptr& session, THashMap&& portionsByPathId, std::vector&& schemas, const TTabletId sourceTabletId) : TBase(self) , Session(session) - , PortionsByPathId(portionsByPathId) - , SourceTabletId(sourceTabletId) -{ + , PortionsByPathId(std::move(portionsByPathId)) + , SchemeHistory(std::move(schemas)) + , SourceTabletId(sourceTabletId) { for (auto&& i : PortionsByPathId) { for (ui32 p = 0; p < i.second.GetPortions().size();) { if (Session->TryTakePortionBlobs(Self->GetIndexAs().GetVersionedIndex(), i.second.GetPortions()[p])) { @@ -53,5 +64,4 @@ TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std } } } - } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.h b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.h index 82b69ac41fb6..91d48eb76346 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.h +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.h @@ -1,10 +1,11 @@ #pragma once +#include #include #include -#include -#include #include -#include +#include +#include +#include namespace NKikimr::NOlap::NDataSharing { @@ -14,12 +15,13 @@ class TTxDataFromSource: public TExtendedTransactionBase Session; THashMap PortionsByPathId; THashMap> SharedBlobIds; + std::vector SchemeHistory; const TTabletId SourceTabletId; protected: virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override; virtual void DoComplete(const TActorContext& ctx) override; public: - TTxDataFromSource(NColumnShard::TColumnShard* self, const std::shared_ptr& session, const THashMap& portionsByPathId, const TTabletId sourceTabletId); + TTxDataFromSource(NColumnShard::TColumnShard* self, const std::shared_ptr& session, THashMap&& portionsByPathId, std::vector&& schemas, const TTabletId sourceTabletId); TTxType GetTxType() const override { return NColumnShard::TXTYPE_DATA_SHARING_DATA_FROM_SOURCE; } }; diff --git a/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp b/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp index 7f5a8cc9f5be..a1bc57a37433 100644 --- a/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp +++ b/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NDataSharing { -void TSessionsManager::Start(const NColumnShard::TColumnShard& shard) const { +void TSessionsManager::Start(NColumnShard::TColumnShard& shard) const { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build()("sessions", "start")("tablet_id", shard.TabletID()); for (auto&& i : SourceSessions) { if (i.second->IsReadyForStarting()) { @@ -22,12 +22,15 @@ void TSessionsManager::Start(const NColumnShard::TColumnShard& shard) const { for (auto&& i : SourceSessions) { if (i.second->IsPrepared()) { - i.second->TryStart(shard); + TConclusionStatus status = i.second->TryStart(shard); + AFL_VERIFY(status.Ok())("failed to start source session", status.GetErrorMessage()); } } for (auto&& i : DestSessions) { if (i.second->IsPrepared() && i.second->IsConfirmed()) { - i.second->TryStart(shard); + TConclusionStatus status = i.second->TryStart(shard); + AFL_VERIFY(status.Ok())("failed to start dest session", status.GetErrorMessage()); + if (!i.second->GetSourcesInProgressCount()) { i.second->Finish(shard, shard.GetDataLocksManager()); } diff --git a/ydb/core/tx/columnshard/data_sharing/manager/sessions.h b/ydb/core/tx/columnshard/data_sharing/manager/sessions.h index a2e5efdaa60b..8c49b64ece85 100644 --- a/ydb/core/tx/columnshard/data_sharing/manager/sessions.h +++ b/ydb/core/tx/columnshard/data_sharing/manager/sessions.h @@ -28,7 +28,7 @@ class TSessionsManager { return SharingSessions.Val(); } - void Start(const NColumnShard::TColumnShard& shard) const; + void Start(NColumnShard::TColumnShard& shard) const; std::shared_ptr GetSourceSession(const TString& sessionId) const { auto it = SourceSessions.find(sessionId); diff --git a/ydb/core/tx/columnshard/data_sharing/protos/events.proto b/ydb/core/tx/columnshard/data_sharing/protos/events.proto index 39d329030197..bc48fa44dde6 100644 --- a/ydb/core/tx/columnshard/data_sharing/protos/events.proto +++ b/ydb/core/tx/columnshard/data_sharing/protos/events.proto @@ -3,6 +3,7 @@ import "ydb/core/tx/columnshard/data_sharing/protos/links.proto"; import "ydb/core/tx/columnshard/data_sharing/protos/data.proto"; import "ydb/core/tx/columnshard/data_sharing/protos/sessions.proto"; import "ydb/core/tx/columnshard/data_sharing/protos/initiator.proto"; +import "ydb/core/protos/tx_columnshard.proto"; package NKikimrColumnShardDataSharingProto; @@ -23,6 +24,7 @@ message TEvSendDataFromSource { optional uint64 PackIdx = 2; repeated TPathIdData PathIdData = 3; optional uint64 SourceTabletId = 4; + repeated NKikimrTxColumnShard.TSchemaPresetVersionInfo SchemeHistory = 5; } message TEvAckDataToSource { diff --git a/ydb/core/tx/columnshard/data_sharing/protos/sessions.proto b/ydb/core/tx/columnshard/data_sharing/protos/sessions.proto index cd0397a9a21d..10c9a1903ed0 100644 --- a/ydb/core/tx/columnshard/data_sharing/protos/sessions.proto +++ b/ydb/core/tx/columnshard/data_sharing/protos/sessions.proto @@ -2,6 +2,7 @@ package NKikimrColumnShardDataSharingProto; import "ydb/core/tx/columnshard/common/protos/snapshot.proto"; import "ydb/core/tx/columnshard/data_sharing/protos/initiator.proto"; +import "ydb/core/protos/tx_columnshard.proto"; message TDestinationRemapIds { optional uint64 SourcePathId = 1; @@ -47,6 +48,8 @@ message TSourceSession { optional uint32 PackIdx = 5; optional uint32 AckReceivedForPackIdx = 6[default = 0]; repeated uint64 LinksModifiedTablets = 7; + optional uint32 NextSchemasIntervalBegin = 8; + optional uint32 NextSchemasIntervalEnd = 9; } message TPathPortionsHash { @@ -56,6 +59,7 @@ message TSourceSession { message TCursorStatic { repeated TPathPortionsHash PathHashes = 7; + repeated NKikimrTxColumnShard.TSchemaPresetVersionInfo SchemeHistory = 8; } } diff --git a/ydb/core/tx/columnshard/data_sharing/protos/transfer.proto b/ydb/core/tx/columnshard/data_sharing/protos/transfer.proto index 8d40ba1abfc1..20a9c3f40ae7 100644 --- a/ydb/core/tx/columnshard/data_sharing/protos/transfer.proto +++ b/ydb/core/tx/columnshard/data_sharing/protos/transfer.proto @@ -6,6 +6,7 @@ message TEvSendDataFromSource { optional NActorsProto.TActorId SourceActorId = 1; optional string SharingId = 2; repeated TPathIdData DataByPathId = 3; + repeated NKikimrTxColumnShard.TSchemaPresetVersionInfo SchemeHistory = 4; } message TEvAckDataToSource { diff --git a/ydb/core/tx/columnshard/data_sharing/protos/ya.make b/ydb/core/tx/columnshard/data_sharing/protos/ya.make index 3b50d7c2303c..445f5ca00f7a 100644 --- a/ydb/core/tx/columnshard/data_sharing/protos/ya.make +++ b/ydb/core/tx/columnshard/data_sharing/protos/ya.make @@ -9,11 +9,11 @@ SRCS( ) PEERDIR( + ydb/core/protos ydb/core/tx/columnshard/engines/protos ydb/core/tx/columnshard/common/protos ydb/library/actors/protos ydb/core/tx/columnshard/blobs_action/protos - ) END() diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp index 8467e93be9c6..fdfed64c8eb4 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp @@ -59,7 +59,31 @@ void TSourceCursor::BuildSelection(const std::shared_ptr& stor std::swap(Selected, result); } +bool TSourceCursor::NextSchemas() { + NextSchemasIntervalBegin = NextSchemasIntervalEnd; + + if (NextSchemasIntervalEnd == SchemeHistory.size()) { + return false; + } + + i32 columnsToSend = 0; + const i32 maxColumnsToSend = 10000; + + // limit the count of schemas to send based on their size in columns + // maxColumnsToSend is pretty random value, so I don't care if columnsToSend would be greater then this value + for (; NextSchemasIntervalEnd < SchemeHistory.size() && columnsToSend < maxColumnsToSend; ++NextSchemasIntervalEnd) { + columnsToSend += SchemeHistory[NextSchemasIntervalEnd].ColumnsSize(); + } + + ++PackIdx; + + return true; +} + bool TSourceCursor::Next(const std::shared_ptr& storagesManager, const TVersionedIndex& index) { + if (NextSchemas()) { + return true; + } PreviousSelected = std::move(Selected); LinksModifiedTablets.clear(); Selected.clear(); @@ -83,6 +107,8 @@ NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic TSourceCursor NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic result; result.SetStartPathId(StartPathId); result.SetStartPortionId(StartPortionId); + result.SetNextSchemasIntervalBegin(NextSchemasIntervalBegin); + result.SetNextSchemasIntervalEnd(NextSchemasIntervalEnd); if (NextPathId) { result.SetNextPathId(*NextPathId); } @@ -104,6 +130,10 @@ NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic TSourceCursor: pathHash->SetPathId(i.first); pathHash->SetHash(i.second); } + + for (auto&& i : SchemeHistory) { + *result.AddSchemeHistory() = i.GetProto(); + } return result; } @@ -112,6 +142,8 @@ NKikimr::TConclusionStatus TSourceCursor::DeserializeFromProto(const NKikimrColu StartPathId = proto.GetStartPathId(); StartPortionId = proto.GetStartPortionId(); PackIdx = proto.GetPackIdx(); + NextSchemasIntervalBegin = proto.GetNextSchemasIntervalBegin(); + NextSchemasIntervalEnd = proto.GetNextSchemasIntervalEnd(); if (!PackIdx) { return TConclusionStatus::Fail("Incorrect proto cursor PackIdx value: " + proto.DebugString()); } @@ -132,6 +164,10 @@ NKikimr::TConclusionStatus TSourceCursor::DeserializeFromProto(const NKikimrColu for (auto&& i : protoStatic.GetPathHashes()) { PathPortionHashes.emplace(i.GetPathId(), i.GetHash()); } + + for (auto&& i : protoStatic.GetSchemeHistory()) { + SchemeHistory.emplace_back(i); + } if (PathPortionHashes.empty()) { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "empty static cursor"); } else { @@ -158,7 +194,8 @@ void TSourceCursor::SaveToDatabase(NIceDb::TNiceDb& db, const TString& sessionId } bool TSourceCursor::Start(const std::shared_ptr& storagesManager, - const THashMap>& portions, const TVersionedIndex& index) { + THashMap>&& portions, std::vector&& schemeHistory, const TVersionedIndex& index) { + SchemeHistory = std::move(schemeHistory); AFL_VERIFY(!IsStartedFlag); std::map> local; NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(0); @@ -185,6 +222,9 @@ bool TSourceCursor::Start(const std::shared_ptr& storagesManag AFL_VERIFY(!StartPortionId); NextPathId = std::nullopt; NextPortionId = std::nullopt; + // we don't need to send scheme history if we don't have data + // this also invalidates cursor in this case + SchemeHistory.clear(); return true; } else if (!StartPathId) { AFL_VERIFY(PortionsForSend.begin()->second.size()); @@ -197,4 +237,4 @@ bool TSourceCursor::Start(const std::shared_ptr& storagesManag IsStartedFlag = true; return true; } -} // namespace NKikimr::NOlap::NDataSharing +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.h b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.h index 1edc2842700f..44dcdc33c99f 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/cursor.h +++ b/ydb/core/tx/columnshard/data_sharing/source/session/cursor.h @@ -1,12 +1,13 @@ #pragma once +#include #include #include -#include +#include namespace NKikimr::NOlap { class TColumnEngineForLogs; class TVersionedIndex; -} +} // namespace NKikimr::NOlap namespace NKikimr::NIceDb { class TNiceDb; @@ -22,6 +23,7 @@ class TSourceCursor { THashMap PreviousSelected; THashMap Selected; THashMap Links; + std::vector SchemeHistory; YDB_READONLY(ui64, StartPathId, 0); YDB_READONLY(ui64, StartPortionId, 0); YDB_READONLY(ui64, PackIdx, 0); @@ -29,6 +31,11 @@ class TSourceCursor { TTransferContext TransferContext; std::optional NextPathId = 0; std::optional NextPortionId = 0; + + // Begin/End of the next slice of SchemeHistory + ui64 NextSchemasIntervalBegin = 0; + ui64 NextSchemasIntervalEnd = 0; + THashSet LinksModifiedTablets; ui64 AckReceivedForPackIdx = 0; std::set PathIds; @@ -39,6 +46,8 @@ class TSourceCursor { NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic SerializeDynamicToProto() const; NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic SerializeStaticToProto() const; + bool NextSchemas(); + public: bool IsAckDataReceived() const { return AckReceivedForPackIdx == PackIdx; @@ -87,6 +96,10 @@ class TSourceCursor { return PreviousSelected; } + TArrayRef GetSelectedSchemas() const { + return TArrayRef(SchemeHistory.data() + NextSchemasIntervalBegin, NextSchemasIntervalEnd - NextSchemasIntervalBegin); + } + const THashMap& GetSelected() const { return Selected; } @@ -98,17 +111,18 @@ class TSourceCursor { bool Next(const std::shared_ptr& storagesManager, const TVersionedIndex& index); bool IsValid() { - return Selected.size(); + AFL_VERIFY(NextSchemasIntervalBegin <= SchemeHistory.size()); + return NextSchemasIntervalBegin < SchemeHistory.size() || Selected.size(); } TSourceCursor(const TTabletId selfTabletId, const std::set& pathIds, const TTransferContext transferContext); void SaveToDatabase(class NIceDb::TNiceDb& db, const TString& sessionId); - bool Start(const std::shared_ptr& storagesManager, const THashMap>& portions, - const TVersionedIndex& index); + bool Start(const std::shared_ptr& storagesManager, THashMap>&& portions, + std::vector&& schemeHistory, const TVersionedIndex& index); [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic& proto, const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic& protoStatic); }; -} \ No newline at end of file +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/source.cpp b/ydb/core/tx/columnshard/data_sharing/source/session/source.cpp index 5a0c56da94a0..40228cb05dd3 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/session/source.cpp @@ -3,12 +3,13 @@ #include #include #include +#include #include #include namespace NKikimr::NOlap::NDataSharing { -NKikimr::TConclusionStatus TSourceSession::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TSourceSession& proto, +NKikimr::TConclusionStatus TSourceSession::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TSourceSession& proto, const std::optional& protoCursor, const std::optional& protoCursorStatic) { auto parseBase = TBase::DeserializeFromProto(proto); @@ -79,7 +80,8 @@ void TSourceSession::ActualizeDestination(const NColumnShard::TColumnShard& shar if (Cursor->IsValid()) { if (!Cursor->IsAckDataReceived()) { const THashMap& packPortions = Cursor->GetSelected(); - auto ev = std::make_unique(GetSessionId(), Cursor->GetPackIdx(), SelfTabletId, packPortions); + + auto ev = std::make_unique(GetSessionId(), Cursor->GetPackIdx(), SelfTabletId, packPortions, Cursor->GetSelectedSchemas()); NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.release(), (ui64)DestinationTabletId, true), IEventHandle::FlagTrackDelivery, GetRuntimeId()); } @@ -102,14 +104,14 @@ void TSourceSession::ActualizeDestination(const NColumnShard::TColumnShard& shar } } -bool TSourceSession::DoStart(const NColumnShard::TColumnShard& shard, const THashMap>& portions) { +void TSourceSession::StartCursor(const NColumnShard::TColumnShard& shard, THashMap>&& portions, std::vector&& schemeHistory) { AFL_VERIFY(Cursor); - if (Cursor->Start(shard.GetStoragesManager(), portions, shard.GetIndexAs().GetVersionedIndex())) { - ActualizeDestination(shard, shard.GetDataLocksManager()); - return true; - } else { - return false; - } + AFL_VERIFY(Cursor->Start(shard.GetStoragesManager(), std::move(portions), std::move(schemeHistory), shard.GetIndexAs().GetVersionedIndex())); + ActualizeDestination(shard, shard.GetDataLocksManager()); } -} \ No newline at end of file +TConclusionStatus TSourceSession::DoStart(NColumnShard::TColumnShard& shard, THashMap>&& portions) { + shard.Execute(new TTxStartSourceCursor(this, &shard, std::move(portions), "start_source_cursor")); + return TConclusionStatus::Success(); +} +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/source/session/source.h b/ydb/core/tx/columnshard/data_sharing/source/session/source.h index 489c012f5b49..926d137b9a37 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/session/source.h +++ b/ydb/core/tx/columnshard/data_sharing/source/session/source.h @@ -1,7 +1,9 @@ #pragma once #include "cursor.h" -#include + #include +#include +#include namespace NKikimr::NIceDb { class TNiceDb; @@ -18,8 +20,9 @@ class TSourceSession: public TCommonSession { std::shared_ptr Cursor; YDB_READONLY_DEF(std::set, PathIds); TTabletId DestinationTabletId = TTabletId(0); + protected: - virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap>& portions) override; + virtual TConclusionStatus DoStart(NColumnShard::TColumnShard& shard, THashMap>&& portions) override; virtual THashSet GetPathIdsForStart() const override { THashSet result; for (auto&& i : PathIds) { @@ -27,20 +30,18 @@ class TSourceSession: public TCommonSession { } return result; } + public: TSourceSession(const TTabletId selfTabletId) : TBase("source_proto") - , SelfTabletId(selfTabletId) - { - + , SelfTabletId(selfTabletId) { } TSourceSession(const TString& sessionId, const TTransferContext& transfer, const TTabletId selfTabletId, const std::set& pathIds, const TTabletId destTabletId) : TBase(sessionId, "source_base", transfer) , SelfTabletId(selfTabletId) , PathIds(pathIds) - , DestinationTabletId(destTabletId) - { + , DestinationTabletId(destTabletId) { } TTabletId GetDestinationTabletId() const { @@ -52,10 +53,9 @@ class TSourceSession: public TCommonSession { } bool IsEqualTo(const TSourceSession& item) const { - return - TBase::IsEqualTo(item) && - DestinationTabletId == item.DestinationTabletId && - PathIds == item.PathIds; + return TBase::IsEqualTo(item) && + DestinationTabletId == item.DestinationTabletId && + PathIds == item.PathIds; } std::shared_ptr GetCursorVerified() const { @@ -64,16 +64,9 @@ class TSourceSession: public TCommonSession { } void SaveCursorToDatabase(NIceDb::TNiceDb& db); - /* - bool TryNextCursor(const ui32 packIdx, const std::shared_ptr& storagesManager, const TVersionedIndex& index) { - AFL_VERIFY(Cursor); - if (packIdx != Cursor->GetPackIdx()) { - return false; - } - Cursor->Next(storagesManager, index); - return true; - } -*/ + + void StartCursor(const NColumnShard::TColumnShard& shard, THashMap>&& portions, std::vector&& schemeHistory); + [[nodiscard]] TConclusion> AckFinished(NColumnShard::TColumnShard* self, const std::shared_ptr& selfPtr); [[nodiscard]] TConclusion> AckData(NColumnShard::TColumnShard* self, const ui32 receivedPackIdx, const std::shared_ptr& selfPtr); [[nodiscard]] TConclusion> AckLinks(NColumnShard::TColumnShard* self, const TTabletId tabletId, const ui32 packIdx, const std::shared_ptr& selfPtr); @@ -91,7 +84,7 @@ class TSourceSession: public TCommonSession { } [[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TSourceSession& proto, - const std::optional& protoCursor, + const std::optional& protoCursor, const std::optional& protoCursorStatic); }; -} \ No newline at end of file +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp index bad3535ea06f..438271ef5f7c 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp @@ -30,6 +30,8 @@ bool TTxDataAckToSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc } void TTxDataAckToSource::DoComplete(const TActorContext& /*ctx*/) { + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("TTxDataAckToSource::DoComplete", "1"); + Session->ActualizeDestination(*Self, Self->GetDataLocksManager()); } diff --git a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_start_source_cursor.cpp b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_start_source_cursor.cpp new file mode 100644 index 000000000000..e88d270b3a63 --- /dev/null +++ b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_start_source_cursor.cpp @@ -0,0 +1,40 @@ +#include "tx_start_source_cursor.h" + +#include +#include + +namespace NKikimr::NOlap::NDataSharing { + +bool TTxStartSourceCursor::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { + using namespace NColumnShard; + + std::vector schemeHistory; + + NIceDb::TNiceDb db(txc.DB); + + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + TSchemaPreset::TSchemaPresetVersionInfo info; + Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue())); + + schemeHistory.push_back(info); + + if (!rowset.Next()) { + return false; + } + } + + std::sort(schemeHistory.begin(), schemeHistory.end()); + + Session->StartCursor(*Self, std::move(Portions), std::move(schemeHistory)); + return true; +} + +void TTxStartSourceCursor::DoComplete(const TActorContext& /*ctx*/) { +} + +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_start_source_cursor.h b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_start_source_cursor.h new file mode 100644 index 000000000000..b558ab4eabd4 --- /dev/null +++ b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_start_source_cursor.h @@ -0,0 +1,31 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NOlap::NDataSharing { + +class TTxStartSourceCursor: public TExtendedTransactionBase { +private: + using TBase = TExtendedTransactionBase; + + TSourceSession* Session; + THashMap> Portions; + +protected: + virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override; + virtual void DoComplete(const TActorContext& ctx) override; + +public: + TTxStartSourceCursor(TSourceSession* session, NColumnShard::TColumnShard* self, THashMap>&& portions, const TString& info) + : TBase(self, info) + , Session(session) + , Portions(std::move(portions)) { + } + + TTxType GetTxType() const override { + return NColumnShard::TXTYPE_DATA_SHARING_START_SOURCE_CURSOR; + } +}; + +} // namespace NKikimr::NOlap::NDataSharing diff --git a/ydb/core/tx/columnshard/data_sharing/source/transactions/ya.make b/ydb/core/tx/columnshard/data_sharing/source/transactions/ya.make index 90269b952ed4..824427fed1a8 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/transactions/ya.make +++ b/ydb/core/tx/columnshard/data_sharing/source/transactions/ya.make @@ -5,6 +5,7 @@ SRCS( tx_data_ack_to_source.cpp tx_finish_ack_to_source.cpp tx_write_source_cursor.cpp + tx_start_source_cursor.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 786d72e5c323..1c9ca5bd8936 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -266,6 +266,14 @@ class IColumnEngine { Diff = info.GetDiff(); } } + + ui64 GetVersion() const { + if (Schema) { + return Schema->GetVersion(); + } + AFL_VERIFY(Diff); + return Diff->GetVersion(); + } }; static ui64 GetMetadataLimit(); @@ -298,6 +306,8 @@ class IColumnEngine { virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr changes, const TSnapshot& snapshot) noexcept = 0; virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0; virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0; + virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0; + virtual const TMap>& GetStats() const = 0; virtual const TColumnEngineStats& GetTotalStats() = 0; virtual ui64 MemoryUsage() const { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 715d382bdc44..facf3d88eb03 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -152,6 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons std::optional indexInfoOptional; if (schema.GetDiff()) { AFL_VERIFY(!VersionedIndex.IsEmpty()); + indexInfoOptional = NOlap::TIndexInfo::BuildFromProto( *schema.GetDiff(), VersionedIndex.GetLastSchema()->GetIndexInfo(), StoragesManager, SchemaObjectsCache); } else { @@ -161,6 +162,36 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional)); } +void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) { + AFL_VERIFY(!VersionedIndex.IsEmpty()); + + ui64 version = schema.GetVersion(); + + ISnapshotSchema::TPtr prevSchema = VersionedIndex.GetLastSchemaBeforeOrEqualSnapshotOptional(version); + + if (prevSchema && version == prevSchema->GetVersion()) { + // skip already registered version + return; + } + + ISnapshotSchema::TPtr secondLast = VersionedIndex.GetLastSchemaBeforeOrEqualSnapshotOptional(VersionedIndex.GetLastSchema()->GetVersion() - 1); + + AFL_VERIFY(!secondLast || secondLast->GetVersion() <= version)("reason", "incorrect schema registration order"); + + std::optional indexInfoOptional; + if (schema.GetDiff()) { + AFL_VERIFY(prevSchema)("reason", "no base schema to apply diff for"); + + indexInfoOptional = NOlap::TIndexInfo::BuildFromProto( + *schema.GetDiff(), prevSchema->GetIndexInfo(), StoragesManager, SchemaObjectsCache); + } else { + indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache); + } + + AFL_VERIFY(indexInfoOptional); + VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional)); +} + bool TColumnEngineForLogs::Load(IDbWrapper& db) { Y_ABORT_UNLESS(!Loaded); Loaded = true; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index aa0f41a7cdec..8274dea90855 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -132,6 +132,7 @@ class TColumnEngineForLogs: public IColumnEngine { void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override; void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override; + void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override; std::shared_ptr Select( ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/schema_version.cpp b/ydb/core/tx/columnshard/engines/scheme/schema_version.cpp new file mode 100644 index 000000000000..051a743efc13 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/schema_version.cpp @@ -0,0 +1,4 @@ +#include "schema_version.h" + +namespace NKikimr::NOlap { +} diff --git a/ydb/core/tx/columnshard/engines/scheme/schema_version.h b/ydb/core/tx/columnshard/engines/scheme/schema_version.h new file mode 100644 index 000000000000..e52645a26ba0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/schema_version.h @@ -0,0 +1,46 @@ +#pragma once +#include +#include + +#include + +namespace NKikimr::NOlap { +class TSchemaPresetVersionInfo { +private: + NKikimrTxColumnShard::TSchemaPresetVersionInfo Proto; + +public: + TSchemaPresetVersionInfo(const NKikimrTxColumnShard::TSchemaPresetVersionInfo& proto) + : Proto(proto) { + } + + const NKikimrTxColumnShard::TSchemaPresetVersionInfo& GetProto() const { + return Proto; + } + + auto operator<=>(const TSchemaPresetVersionInfo& rhs) const { + return std::tuple(Proto.GetId(), Proto.GetSinceStep(), Proto.GetSinceTxId()) <=> std::tuple(rhs.Proto.GetId(), rhs.Proto.GetSinceStep(), rhs.Proto.GetSinceTxId()); + } + + void SaveToLocalDb(NIceDb::TNiceDb& db) { + using namespace NKikimr::NColumnShard; + db.Table().Key(Proto.GetId(), Proto.GetSinceStep(), Proto.GetSinceTxId()).Update(Proto.SerializeAsString()); + } + + TSnapshot GetSnapshot() const { + return TSnapshot(Proto.GetSinceStep(), Proto.GetSinceTxId()); + } + + NOlap::IColumnEngine::TSchemaInitializationData GetSchema() const { + return NOlap::IColumnEngine::TSchemaInitializationData(Proto); + } + + ui64 ColumnsSize() const { + if (Proto.HasSchema()) { + return Proto.GetSchema().ColumnsSize(); + } + AFL_VERIFY(Proto.HasDiff()); + return Proto.GetDiff().UpsertColumnsSize() + Proto.GetDiff().UpsertIndexesSize(); + } +}; +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index fe554a790d8f..e399535a96a7 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -91,10 +91,20 @@ class TVersionedIndex { } } Y_ABORT_UNLESS(!Snapshots.empty()); -// Y_ABORT_UNLESS(version.IsZero()); return Snapshots.begin()->second; } + ISnapshotSchema::TPtr GetLastSchemaBeforeOrEqualSnapshotOptional(const ui64 version) const { + ISnapshotSchema::TPtr res = nullptr; + for (auto it = SnapshotByVersion.rbegin(); it != SnapshotByVersion.rend(); ++it) { + if (it->first <= version) { + res = it->second; + break; + } + } + return res; + } + ISnapshotSchema::TPtr GetLastSchema() const { Y_ABORT_UNLESS(!Snapshots.empty()); return Snapshots.rbegin()->second; diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index e74a0ac2079e..a8b2572ac574 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -9,6 +9,7 @@ SRCS( column_features.cpp schema_diff.cpp objects_cache.cpp + schema_version.cpp ) PEERDIR(