Skip to content

Commit

Permalink
Test: split shard with decimal PK
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin committed Feb 7, 2025
1 parent 94b1c24 commit 805e8ed
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 47 deletions.
31 changes: 30 additions & 1 deletion ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2487,7 +2487,8 @@ namespace NSchemeShardUT_Private {

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, datashardTabletId, sender, ev.Release());
runtime.GrabEdgeEvent<TEvDataShard::TEvUploadRowsResponse>(sender);
auto evResponse = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvUploadRowsResponse>(sender);
UNIT_ASSERT_C(evResponse->Get()->Record.GetStatus() == NKikimrTxDataShard::TError::OK, "Status: " << evResponse->Get()->Record.GetStatus() << " Issues: " << evResponse->Get()->Record.GetErrorDescription());
}

void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected) {
Expand Down Expand Up @@ -2537,4 +2538,32 @@ namespace NSchemeShardUT_Private {
SendNextValRequest(runtime, sender, path);
return WaitNextValResult(runtime, sender, expectedStatus);
}

void WaitForTableSplit(TTestActorRuntime& runtime, const TString& path) {
while (true) {
TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvGetTableStatsResult::EventType);

WaitForSuppressed(runtime, suppressed, 1, prevObserver);
for (auto &msg : suppressed) {
runtime.Send(msg.Release());
}
suppressed.clear();

bool itIsEnough = false;

NLs::TCheckFunc checkPartitionCount = [&] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
if (record.GetPathDescription().TablePartitionsSize() >= 10) {
itIsEnough = true;
}
};

TestDescribeResult(DescribePath(runtime, path, true),
{checkPartitionCount});

if (itIsEnough) {
return;
}
}
}
}
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -636,4 +636,5 @@ namespace NSchemeShardUT_Private {
TTestActorRuntime& runtime, const TString& path,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);

void WaitForTableSplit(TTestActorRuntime& runtime, const TString& path);
} //NSchemeShardUT_Private
100 changes: 54 additions & 46 deletions ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,34 +132,66 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
)");
env.TestWaitNotification(runtime, txId);

while (true) {
TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvGetTableStatsResult::EventType);
WaitForTableSplit(runtime, "/MyRoot/Table");
}

WaitForSuppressed(runtime, suppressed, 1, prevObserver);
for (auto &msg : suppressed) {
runtime.Send(msg.Release());
}
suppressed.clear();
Y_UNIT_TEST(SplitShardsWithDecimalKey) {
TTestBasicRuntime runtime;

bool itIsEnough = false;
TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);

NLs::TCheckFunc checkPartitionCount = [&] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
if (record.GetPathDescription().TablePartitionsSize() >= 10) {
itIsEnough = true;
}
};
TTestEnv env(runtime, opts);

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{checkPartitionCount});
ui64 txId = 100;

if (itIsEnough) {
return;
}
NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
NDataShard::gDbStatsDataSizeResolution = 10;
NDataShard::gDbStatsRowCountResolution = 10;

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_ERROR);
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_ERROR);

TestCreateTable(runtime, ++txId, "/MyRoot", R"_(
Name: "Table"
Columns { Name: "key" Type: "Decimal(35, 10)"}
Columns { Name: "decimal_value" Type: "Decimal(2, 1)"}
Columns { Name: "string_value" Type: "Utf8"}
KeyColumnNames: ["key"]
)_");
env.TestWaitNotification(runtime, txId);

const std::pair<ui64, ui64> decimalValue = NYql::NDecimal::MakePair(
NYql::NDecimal::FromString("32.1", 2, 1));
TString stringValue(1000, 'A');

for (ui64 key = 0; key < 1000; ++key) {
const std::pair<ui64, ui64> decimalKey = NYql::NDecimal::MakePair(
NYql::NDecimal::FromString(Sprintf("%d.123456789", key * 1'000'000), 35, 10));
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2, 3},
{TCell::Make<std::pair<ui64, ui64>>(decimalKey)},
{TCell::Make<std::pair<ui64, ui64>>(decimalValue), TCell(stringValue)});
}

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{NLs::PartitionCount(1)});

TestAlterTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
PartitionConfig {
PartitioningPolicy {
MinPartitionsCount: 100
MaxPartitionsCount: 100
SizeToSplit: 1
}
}
)");
env.TestWaitNotification(runtime, txId);

WaitForTableSplit(runtime, "/MyRoot/Table");
}

Y_UNIT_TEST(SplitShardsWhithPgKey) {
Y_UNIT_TEST(SplitShardsWithPgKey) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
Expand All @@ -185,7 +217,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
)");
env.TestWaitNotification(runtime, txId);

TString valueString = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
TString valueString(1000, 'A');;
for (ui64 key = 0; key < 1000; ++key) {
auto pgKey = NPg::PgNativeBinaryFromNativeText(ToString(key * 1'000'000), NPg::TypeDescFromPgTypeName("pgint8")).Str;
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell(pgKey)}, {TCell(valueString)});
Expand All @@ -206,31 +238,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
)");
env.TestWaitNotification(runtime, txId);

while (true) {
TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvGetTableStatsResult::EventType);

WaitForSuppressed(runtime, suppressed, 1, prevObserver);
for (auto &msg : suppressed) {
runtime.Send(msg.Release());
}
suppressed.clear();

bool itIsEnough = false;

NLs::TCheckFunc checkPartitionCount = [&] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
if (record.GetPathDescription().TablePartitionsSize() >= 10) {
itIsEnough = true;
}
};

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{checkPartitionCount});

if (itIsEnough) {
return;
}
}
WaitForTableSplit(runtime, "/MyRoot/Table");
}

Y_UNIT_TEST(Merge1KShards) {
Expand Down

0 comments on commit 805e8ed

Please sign in to comment.