diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 1c49a8d7d99b..889063bbc585 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -24,6 +24,13 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restart ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64 ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest +ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMerge +ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplits +ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsThenMerges +ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsAfterWait +ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait +ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsAfterWait +ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsWhenWait ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata diff --git a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp index c6a00cdde5b1..cfbdd117d3e9 100644 --- a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp +++ b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp @@ -1,14 +1,16 @@ -#include "helpers/typed_local.h" #include "helpers/local.h" +#include "helpers/typed_local.h" #include "helpers/writer.h" -#include -#include + +#include #include -#include #include -#include #include -#include +#include +#include +#include +#include + #include #include @@ -276,7 +278,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { void WaitResharding(const TString& hint = "") { const TInstant start = TInstant::Now(); bool clean = false; - while (TInstant::Now() - start < TDuration::Seconds(20)) { + while (TInstant::Now() - start < TDuration::Seconds(200)) { NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver()); auto result = operationClient.List().GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); @@ -408,7 +410,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { public: TAsyncReshardingTest() { - TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 128, 4); + TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 1024, 32); } void AddBatch(int numRows) { @@ -561,5 +563,120 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { tester.CheckCount(); } + + Y_UNIT_TEST(MultipleMerge) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + tester.AddBatch(10000); + + for (int i = 0; i < 4; i++) { + tester.StartResharding("MERGE"); + tester.WaitResharding(); + } + + tester.RestartAllShards(); + + tester.CheckCount(); + } + + Y_UNIT_TEST(MultipleSplits) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + tester.AddBatch(10000); + + for (int i = 0; i < 4; i++) { + tester.StartResharding("SPLIT"); + tester.WaitResharding(); + } + + tester.RestartAllShards(); + + tester.CheckCount(); + } + + Y_UNIT_TEST(MultipleSplitsThenMerges) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + tester.AddBatch(10000); + + for (int i = 0; i < 4; i++) { + tester.StartResharding("SPLIT"); + tester.WaitResharding(); + } + + for (int i = 0; i < 8; i++) { + tester.StartResharding("MERGE"); + tester.WaitResharding(); + } + + tester.RestartAllShards(); + + tester.CheckCount(); + } + + Y_UNIT_TEST(MultipleSplitsWithRestartsAfterWait) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + tester.AddBatch(10000); + + for (int i = 0; i < 4; i++) { + tester.StartResharding("SPLIT"); + tester.WaitResharding(); + tester.RestartAllShards(); + } + + tester.CheckCount(); + } + + Y_UNIT_TEST(MultipleSplitsWithRestartsWhenWait) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + tester.AddBatch(10000); + + for (int i = 0; i < 4; i++) { + tester.StartResharding("SPLIT"); + tester.RestartAllShards(); + tester.WaitResharding(); + } + tester.RestartAllShards(); + + tester.CheckCount(); + } + + Y_UNIT_TEST(MultipleMergesWithRestartsAfterWait) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + tester.AddBatch(10000); + + for (int i = 0; i < 4; i++) { + tester.StartResharding("MERGE"); + tester.WaitResharding(); + tester.RestartAllShards(); + } + + tester.CheckCount(); + } + + Y_UNIT_TEST(MultipleMergesWithRestartsWhenWait) { + TAsyncReshardingTest tester; + tester.DisableCompaction(); + + tester.AddBatch(10000); + + for (int i = 0; i < 4; i++) { + tester.StartResharding("MERGE"); + tester.RestartAllShards(); + tester.WaitResharding(); + } + tester.RestartAllShards(); + + tester.CheckCount(); + } } } diff --git a/ydb/core/tx/columnshard/transactions/operators/sharing.cpp b/ydb/core/tx/columnshard/transactions/operators/sharing.cpp index ec90f07c16eb..666ee719cb71 100644 --- a/ydb/core/tx/columnshard/transactions/operators/sharing.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/sharing.cpp @@ -52,11 +52,24 @@ void TSharingTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner } bool TSharingTransactionOperator::ProgressOnExecute( - TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) { + TColumnShard& owner, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& txc) { + if (!SharingTask) { + return true; + } + if (!TxFinish) { + TxFinish = SharingTask->AckInitiatorFinished(&owner, SharingTask).DetachResult(); + } + TxFinish->Execute(txc, NActors::TActivationContext::AsActorContext()); + return true; } bool TSharingTransactionOperator::ProgressOnComplete(TColumnShard& owner, const TActorContext& ctx) { + if (!SharingTask) { + return true; + } + AFL_VERIFY(!!TxFinish); + TxFinish->Complete(ctx); for (TActorId subscriber : NotifySubscribers) { auto event = MakeHolder(owner.TabletID(), GetTxId()); ctx.Send(subscriber, event.Release(), 0, 0); diff --git a/ydb/core/tx/columnshard/transactions/operators/sharing.h b/ydb/core/tx/columnshard/transactions/operators/sharing.h index 13c7df7cad0e..c5c961d98ba2 100644 --- a/ydb/core/tx/columnshard/transactions/operators/sharing.h +++ b/ydb/core/tx/columnshard/transactions/operators/sharing.h @@ -17,6 +17,7 @@ class TSharingTransactionOperator: public IProposeTxOperator, public TMonitoring mutable std::unique_ptr TxPropose; mutable std::unique_ptr TxConfirm; mutable std::unique_ptr TxAbort; + mutable std::unique_ptr TxFinish; static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_SHARING); THashSet NotifySubscribers; virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;