Skip to content

Commit

Permalink
Add UpsertWhileSplitTest (ydb-platform#8472)
Browse files Browse the repository at this point in the history
Conflicts:
	ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
  • Loading branch information
fexolm authored and zverevgeny committed Sep 11, 2024
1 parent 4733b09 commit c35dea1
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 39 deletions.
151 changes: 116 additions & 35 deletions ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,9 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
}

class TReshardingTest {
private:
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");
public:
TReshardingTest()
: Kikimr(GetKikimrSettings())
: Kikimr(TKikimrSettings().SetWithSampleTables(false))
, CSController(NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>())
, TableClient(Kikimr.GetTableClient()) {
CSController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
Expand Down Expand Up @@ -309,22 +307,20 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
CompareYson(result, "[[" + ::ToString(expectation) + "u;]]");
}

protected:
TKikimrRunner Kikimr;
public:
NKikimr::NYDBTest::TControllers::TGuard<NKikimr::NYDBTest::NColumnShard::TController> CSController;
NYdb::NTable::TTableClient TableClient;
};

TReshardingTest()
: Kikimr(TKikimrSettings().SetWithSampleTables(false)) {
class TShardingTypeTest: public TReshardingTest {
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");

}
public:
using TReshardingTest::TReshardingTest;

void Execute() {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 24, 4);
auto tableClient = Kikimr.GetTableClient();

Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

Expand All @@ -341,71 +337,156 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
CheckCount(230000);
for (ui32 i = 0; i < 2; ++i) {
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
WaitResharding("SPLIT:" + ::ToString(i));
}
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
AFL_VERIFY(csController->GetShardingFiltersCount().Val() == 0);
AFL_VERIFY(CSController->GetShardingFiltersCount().Val() == 0);
CheckCount(230000);
i64 count = csController->GetShardingFiltersCount().Val();
i64 count = CSController->GetShardingFiltersCount().Val();
AFL_VERIFY(count >= 16)("count", count);
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
csController->WaitIndexation(TDuration::Seconds(3));
csController->WaitCompactions(TDuration::Seconds(3));
CSController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
CSController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
CSController->WaitIndexation(TDuration::Seconds(3));
CSController->WaitCompactions(TDuration::Seconds(3));
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
CheckCount(230000);
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
CSController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
CSController->WaitIndexation(TDuration::Seconds(5));
CheckCount(230000);
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
csController->WaitCompactions(TDuration::Seconds(5));
count = csController->GetShardingFiltersCount().Val();
CSController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
CSController->WaitCompactions(TDuration::Seconds(5));
count = CSController->GetShardingFiltersCount().Val();
CheckCount(230000);

csController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);
CSController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);

CheckCount(230000);

AFL_VERIFY(count == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
AFL_VERIFY(count == CSController->GetShardingFiltersCount().Val())("count", count)(
"val", CSController->GetShardingFiltersCount().Val());
const ui32 portionsCount = 16;
for (ui32 i = 0; i < 4; ++i) {
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
WaitResharding("MERGE:" + ::ToString(i));
// csController->WaitCleaning(TDuration::Seconds(5));
// CSController->WaitCleaning(TDuration::Seconds(5));

CheckCount(230000);
AFL_VERIFY(count + portionsCount == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
AFL_VERIFY(count + portionsCount == CSController->GetShardingFiltersCount().Val())("count", count)(
"val", CSController->GetShardingFiltersCount().Val());
count += portionsCount;
}
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
csController->CheckInvariants();
CSController->CheckInvariants();
}
};

Y_UNIT_TEST(TableReshardingConsistency64) {
TReshardingTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
TShardingTypeTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
}

Y_UNIT_TEST(TableReshardingModuloN) {
TReshardingTest().SetShardingType("HASH_FUNCTION_MODULO_N").Execute();
TShardingTypeTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
}

class TAsyncReshardingTest: public TReshardingTest {
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");

public:
TAsyncReshardingTest() {
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 24, 4);
}

void AddBatch(int numRows) {
WriteTestData(Kikimr, "/Root/olapStore/olapTable", LastPathId, LastTs, numRows);
LastPathId += numRows * 10;
LastTs += numRows * 10;
NumRows += numRows;
}

void StartResharding(TString modification) {
auto alterQuery =
TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=)"
<< modification << ");";
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

void CheckCount() {
TReshardingTest::CheckCount(NumRows);
}

void ChangeSchema() {
auto alterQuery =
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=level, "
"`SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, "
"`COMPRESSION.TYPE`=`zstd`);";
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

void DisableCompaction() {
CSController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);
}

private:
ui64 LastPathId = 1000000;
ui64 LastTs = 300000000;
ui64 NumRows = 0;
};

Y_UNIT_TEST(UpsertWhileSplitTest) {
TAsyncReshardingTest tester;

tester.AddBatch(10000);

tester.CheckCount();

for (int i = 0; i < 4; i++) {
tester.StartResharding("SPLIT");

tester.CheckCount();
tester.AddBatch(10000);
tester.CheckCount();
tester.WaitResharding();
}
tester.AddBatch(10000);
tester.CheckCount();
}

Y_UNIT_TEST(ChangeSchemaAndSplit) {
TAsyncReshardingTest tester;
tester.DisableCompaction();

tester.AddBatch(10000);
tester.ChangeSchema();
tester.AddBatch(10000);

tester.StartResharding("SPLIT");
tester.WaitResharding();

tester.CheckCount();
}
}
}
Loading

0 comments on commit c35dea1

Please sign in to comment.