Skip to content

Commit

Permalink
Remove destination session after partitioning finish (#11411)
Browse files Browse the repository at this point in the history
  • Loading branch information
fexolm authored Nov 11, 2024
1 parent 02d2031 commit 9b3c2bd
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 9 deletions.
7 changes: 7 additions & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restart
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMerge
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplits
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsThenMerges
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsAfterWait
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsAfterWait
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsWhenWait
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata
Expand Down
133 changes: 125 additions & 8 deletions ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#include "helpers/typed_local.h"
#include "helpers/local.h"
#include "helpers/typed_local.h"
#include "helpers/writer.h"
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/tx/columnshard/data_sharing/initiator/status/abstract.h>
#include <ydb/core/tx/columnshard/data_sharing/common/context/context.h>
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
#include <ydb/core/tx/columnshard/data_sharing/destination/events/control.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
#include <ydb/core/tx/columnshard/data_sharing/initiator/status/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>

#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_ss_tasks/task.h>

Expand Down Expand Up @@ -276,7 +278,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
void WaitResharding(const TString& hint = "") {
const TInstant start = TInstant::Now();
bool clean = false;
while (TInstant::Now() - start < TDuration::Seconds(20)) {
while (TInstant::Now() - start < TDuration::Seconds(200)) {
NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver());
auto result = operationClient.List<NYdb::NSchemeShard::TBackgroundProcessesResponse>().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
Expand Down Expand Up @@ -408,7 +410,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {

public:
TAsyncReshardingTest() {
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 128, 4);
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 1024, 32);
}

void AddBatch(int numRows) {
Expand Down Expand Up @@ -561,5 +563,120 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {

tester.CheckCount();
}

Y_UNIT_TEST(MultipleMerge) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddBatch(10000);

for (int i = 0; i < 4; i++) {
tester.StartResharding("MERGE");
tester.WaitResharding();
}

tester.RestartAllShards();

tester.CheckCount();
}

Y_UNIT_TEST(MultipleSplits) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddBatch(10000);

for (int i = 0; i < 4; i++) {
tester.StartResharding("SPLIT");
tester.WaitResharding();
}

tester.RestartAllShards();

tester.CheckCount();
}

Y_UNIT_TEST(MultipleSplitsThenMerges) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddBatch(10000);

for (int i = 0; i < 4; i++) {
tester.StartResharding("SPLIT");
tester.WaitResharding();
}

for (int i = 0; i < 8; i++) {
tester.StartResharding("MERGE");
tester.WaitResharding();
}

tester.RestartAllShards();

tester.CheckCount();
}

Y_UNIT_TEST(MultipleSplitsWithRestartsAfterWait) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddBatch(10000);

for (int i = 0; i < 4; i++) {
tester.StartResharding("SPLIT");
tester.WaitResharding();
tester.RestartAllShards();
}

tester.CheckCount();
}

Y_UNIT_TEST(MultipleSplitsWithRestartsWhenWait) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddBatch(10000);

for (int i = 0; i < 4; i++) {
tester.StartResharding("SPLIT");
tester.RestartAllShards();
tester.WaitResharding();
}
tester.RestartAllShards();

tester.CheckCount();
}

Y_UNIT_TEST(MultipleMergesWithRestartsAfterWait) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddBatch(10000);

for (int i = 0; i < 4; i++) {
tester.StartResharding("MERGE");
tester.WaitResharding();
tester.RestartAllShards();
}

tester.CheckCount();
}

Y_UNIT_TEST(MultipleMergesWithRestartsWhenWait) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddBatch(10000);

for (int i = 0; i < 4; i++) {
tester.StartResharding("MERGE");
tester.RestartAllShards();
tester.WaitResharding();
}
tester.RestartAllShards();

tester.CheckCount();
}
}
}
15 changes: 14 additions & 1 deletion ydb/core/tx/columnshard/transactions/operators/sharing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,24 @@ void TSharingTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner
}

bool TSharingTransactionOperator::ProgressOnExecute(
TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) {
TColumnShard& owner, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& txc) {
if (!SharingTask) {
return true;
}
if (!TxFinish) {
TxFinish = SharingTask->AckInitiatorFinished(&owner, SharingTask).DetachResult();
}
TxFinish->Execute(txc, NActors::TActivationContext::AsActorContext());

return true;
}

bool TSharingTransactionOperator::ProgressOnComplete(TColumnShard& owner, const TActorContext& ctx) {
if (!SharingTask) {
return true;
}
AFL_VERIFY(!!TxFinish);
TxFinish->Complete(ctx);
for (TActorId subscriber : NotifySubscribers) {
auto event = MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(owner.TabletID(), GetTxId());
ctx.Send(subscriber, event.Release(), 0, 0);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/transactions/operators/sharing.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TSharingTransactionOperator: public IProposeTxOperator, public TMonitoring
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxPropose;
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxConfirm;
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxAbort;
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxFinish;
static inline auto Registrator = TFactory::TRegistrator<TSharingTransactionOperator>(NKikimrTxColumnShard::TX_KIND_SHARING);
THashSet<TActorId> NotifySubscribers;
virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;
Expand Down

0 comments on commit 9b3c2bd

Please sign in to comment.