Skip to content

Commit

Permalink
Merge e021073 into 9c8c951
Browse files Browse the repository at this point in the history
  • Loading branch information
fexolm authored Aug 29, 2024
2 parents 9c8c951 + e021073 commit 64acb19
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 38 deletions.
130 changes: 96 additions & 34 deletions ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,15 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
}

class TReshardingTest {
private:
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");
public:
TReshardingTest()
: Kikimr(TKikimrSettings().SetWithSampleTables(false))
, CSController(NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>())
, TableClient(Kikimr.GetTableClient()) {
CSController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
CSController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
CSController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
}

void WaitResharding(const TString& hint = "") {
const TInstant start = TInstant::Now();
Expand Down Expand Up @@ -308,22 +315,20 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
CompareYson(result, "[[" + ::ToString(expectation) + "u;]]");
}

protected:
TKikimrRunner Kikimr;
public:
NKikimr::NYDBTest::TControllers::TGuard<NKikimr::NYDBTest::NColumnShard::TController> CSController;
NYdb::NTable::TTableClient TableClient;
};

TReshardingTest()
: Kikimr(TKikimrSettings().SetWithSampleTables(false)) {
class TShardingTypeTest: public TReshardingTest {
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");

}
public:
using TReshardingTest::TReshardingTest;

void Execute() {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 24, 4);
auto tableClient = Kikimr.GetTableClient();

Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

Expand All @@ -340,71 +345,128 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
CheckCount(230000);
for (ui32 i = 0; i < 2; ++i) {
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
WaitResharding("SPLIT:" + ::ToString(i));
}
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
AFL_VERIFY(csController->GetShardingFiltersCount().Val() == 0);
AFL_VERIFY(CSController->GetShardingFiltersCount().Val() == 0);
CheckCount(230000);
i64 count = csController->GetShardingFiltersCount().Val();
i64 count = CSController->GetShardingFiltersCount().Val();
AFL_VERIFY(count >= 16)("count", count);
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
csController->WaitIndexation(TDuration::Seconds(3));
csController->WaitCompactions(TDuration::Seconds(3));
CSController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
CSController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
CSController->WaitIndexation(TDuration::Seconds(3));
CSController->WaitCompactions(TDuration::Seconds(3));
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
CheckCount(230000);
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
CSController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
CSController->WaitIndexation(TDuration::Seconds(5));
CheckCount(230000);
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
csController->WaitCompactions(TDuration::Seconds(5));
count = csController->GetShardingFiltersCount().Val();
CSController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
CSController->WaitCompactions(TDuration::Seconds(5));
count = CSController->GetShardingFiltersCount().Val();
CheckCount(230000);

csController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);
CSController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);

CheckCount(230000);

AFL_VERIFY(count == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
AFL_VERIFY(count == CSController->GetShardingFiltersCount().Val())("count", count)(
"val", CSController->GetShardingFiltersCount().Val());
const ui32 portionsCount = 16;
for (ui32 i = 0; i < 4; ++i) {
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
WaitResharding("MERGE:" + ::ToString(i));
// csController->WaitCleaning(TDuration::Seconds(5));
// CSController->WaitCleaning(TDuration::Seconds(5));

CheckCount(230000);
AFL_VERIFY(count + portionsCount == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
AFL_VERIFY(count + portionsCount == CSController->GetShardingFiltersCount().Val())("count", count)(
"val", CSController->GetShardingFiltersCount().Val());
count += portionsCount;
}
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
csController->CheckInvariants();
CSController->CheckInvariants();
}
};

Y_UNIT_TEST(TableReshardingConsistency64) {
TReshardingTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
TShardingTypeTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
}

Y_UNIT_TEST(TableReshardingModuloN) {
TReshardingTest().SetShardingType("HASH_FUNCTION_MODULO_N").Execute();
TShardingTypeTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
}

class TAsyncReshardingTest: public TReshardingTest {
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");

public:
TAsyncReshardingTest() {
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 24, 4);
}

void AddBatch(int numRows) {
WriteTestData(Kikimr, "/Root/olapStore/olapTable", LastPathId, LastTs, numRows);
LastPathId += numRows * 10;
LastTs += numRows * 10;
NumRows += numRows;
}

NYdb::EStatus StartResharding(TString modification) {
auto alterQuery =
TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=)"
<< modification << ");)";
auto session = TableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();

return alterResult.GetStatus();
}

void CheckCount() {
TReshardingTest::CheckCount(NumRows);
}

private:
ui64 LastPathId{ 1000000 };
ui64 LastTs{ 300000000 };
ui64 NumRows{ 0 };
};

Y_UNIT_TEST(UpsertWhileSplitTest) {
TAsyncReshardingTest tester;

tester.AddBatch(10000);

tester.CheckCount();

for (int i = 0; i < 4; i++) {
UNIT_ASSERT_VALUES_EQUAL_C(tester.StartResharding("SPLIT"), NYdb::EStatus::SUCCESS, "");
UNIT_ASSERT_VALUES_UNEQUAL_C(tester.StartResharding("SPLIT"), NYdb::EStatus::SUCCESS, "");

tester.CheckCount();
tester.AddBatch(10000);
tester.CheckCount();
tester.WaitResharding();
}
tester.AddBatch(10000);
tester.CheckCount();
}
}
}
6 changes: 2 additions & 4 deletions ydb/tests/olap/scenario/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ def get_suite_name(cls):

@classmethod
def setup_class(cls):
if not external_param_is_true('reuse-tables'):
ScenarioTestHelper(None).remove_path(cls.get_suite_name())
pass

@classmethod
def teardown_class(cls):
if not external_param_is_true('keep-tables'):
ScenarioTestHelper(None).remove_path(cls.get_suite_name())
pass

def test(self, ctx: TestContext):
allure_test_description(ctx.suite, ctx.test)
Expand Down
71 changes: 71 additions & 0 deletions ydb/tests/olap/scenario/helpers/table_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,37 @@ def title(self) -> str:
return f'add column `{self._column.name}`'


class AlterSharding(AlterTableAction):
"""Change shards count for tablestore.
Table-like objects are Tables and TableStore.
See {AlterTableLikeObject}.
Example:
sth = ScenarioTestHelper(ctx)
sth.execute_scheme_query(
AlterTable('testTable')
.action(AlterSharding("SPLIT"))
)
"""

def __init__(self, modification) -> None:
"""Constructor.
Args:
column: Column description."""

super().__init__()
self._modification = modification

@override
def to_yql(self) -> str:
return f'SET(ACTION=ALTER_SHARDING, MODIFICATION={self._modification})'

@override
def title(self) -> str:
return self.to_yql()

class DropColumn(AlterTableAction):
"""Remove a column from a table-like object.
Expand Down Expand Up @@ -426,3 +457,43 @@ class AlterTableStore(AlterTableLikeObject):
@override
def _type(self) -> str:
return 'tablestore'

class AlterObject(ScenarioTestHelper.IYqlble):
def __init__(self, name: str) -> None:
"""Constructor.
Args:
name: Name (relative path) of the altered object."""

super().__init__(name)
self._actions = []

@override
def params(self) -> Dict[str, str]:
return {self._type(): self._name, 'actions': ', '.join([a.title() for a in self._actions])}

@override
def title(self):
return f'alter object (TYPE TABLESTORE)'

@override
def to_yql(self, ctx: TestContext) -> str:
actions = ', '.join([a.to_yql() for a in self._actions])
return f'ALTER OBJECT `{ScenarioTestHelper(ctx).get_full_path(self._name)}` (TYPE TABLESTORE) {actions}'

def action(self, action: AlterTableAction) -> AlterTableLikeObject:
"""Add an action with an object.
Args:
action: Action on the object, such as creating or deleting a column.
Returns:
self."""

self._actions.append(action)
return self

@override
def _type(self) -> str:
return 'tablestore'

57 changes: 57 additions & 0 deletions ydb/tests/olap/scenario/test_partitioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from conftest import BaseTestSet
from ydb.tests.olap.scenario.helpers import (
ScenarioTestHelper,
TestContext,
CreateTable,
CreateTableStore,
AlterObject,
AlterSharding
)
from ydb import PrimitiveType, StatusCode
import ydb.tests.olap.scenario.helpers.data_generators as dg
import time


class TestPartitioning(BaseTestSet):
schema1 = (
ScenarioTestHelper.Schema()
.with_column(name='id', type=PrimitiveType.Int32, not_null=True)
.with_column(name='level', type=PrimitiveType.Uint32)
.with_key_columns('id')
)

def _test_table(self, ctx: TestContext, table_name: str):
sth = ScenarioTestHelper(ctx)
sth.execute_scheme_query(CreateTable(table_name).with_schema(self.schema1).with_partitions_count(2))

sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids")

sth.execute_scheme_query(AlterObject(table_name).action(AlterSharding("SPLIT")))

# sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids")

time.sleep(240)

sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids")

sth.execute_scheme_query(AlterObject(table_name).action(AlterSharding("SPLIT")))

# sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids")

time.sleep(240)

sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids")

sth.execute_scheme_query(AlterObject(table_name).action(AlterSharding("MERGE")))

# sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids")

# gen = dg.DataGeneratorPerColumn(self.schema1, 100, dg.ColumnValueGeneratorDefault(init_value=200))
# sth.bulk_upsert(table_name, gen, comment="100 sequetial ids")


def scenario_table(self, ctx: TestContext):
tablestore_name = "testStore5"
sth = ScenarioTestHelper(ctx)
sth.execute_scheme_query(CreateTableStore(tablestore_name).with_schema(self.schema1))
self._test_table(ctx, tablestore_name + '/testTable')
1 change: 1 addition & 0 deletions ydb/tests/olap/scenario/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PY3TEST()
test_simple.py
test_scheme_load.py
test_alter_tiering.py
test_partitioning.py
)

PEERDIR(
Expand Down

0 comments on commit 64acb19

Please sign in to comment.