Skip to content

Commit

Permalink
Test: split shard with decimal PK (ydb-platform#14339)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Feb 10, 2025
1 parent bfa1229 commit b1f11b4
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 47 deletions.
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2487,7 +2487,8 @@ namespace NSchemeShardUT_Private {

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, datashardTabletId, sender, ev.Release());
runtime.GrabEdgeEvent<TEvDataShard::TEvUploadRowsResponse>(sender);
auto evResponse = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvUploadRowsResponse>(sender);
UNIT_ASSERT_C(evResponse->Get()->Record.GetStatus() == NKikimrTxDataShard::TError::OK, "Status: " << evResponse->Get()->Record.GetStatus() << " Issues: " << evResponse->Get()->Record.GetErrorDescription());
}

void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected) {
Expand Down
120 changes: 74 additions & 46 deletions ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@ using namespace NKikimr::NMiniKQL;
using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;

namespace {

void WaitForTableSplit(TTestActorRuntime& runtime, const TString& path, size_t requiredPartitionCount = 10) {
while (true) {
TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvGetTableStatsResult::EventType);

WaitForSuppressed(runtime, suppressed, 1, prevObserver);
for (auto &msg : suppressed) {
runtime.Send(msg.Release());
}
suppressed.clear();

const auto result = DescribePath(runtime, path, true);
if (result.GetPathDescription().TablePartitionsSize() >= requiredPartitionCount)
return;
}
}
}

Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
Y_UNIT_TEST(Test) {
}
Expand Down Expand Up @@ -132,34 +152,66 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
)");
env.TestWaitNotification(runtime, txId);

while (true) {
TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvGetTableStatsResult::EventType);
WaitForTableSplit(runtime, "/MyRoot/Table");
}

WaitForSuppressed(runtime, suppressed, 1, prevObserver);
for (auto &msg : suppressed) {
runtime.Send(msg.Release());
}
suppressed.clear();
Y_UNIT_TEST(SplitShardsWithDecimalKey) {
TTestBasicRuntime runtime;

bool itIsEnough = false;
TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);

NLs::TCheckFunc checkPartitionCount = [&] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
if (record.GetPathDescription().TablePartitionsSize() >= 10) {
itIsEnough = true;
}
};
TTestEnv env(runtime, opts);

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{checkPartitionCount});
ui64 txId = 100;

if (itIsEnough) {
return;
}
NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
NDataShard::gDbStatsDataSizeResolution = 10;
NDataShard::gDbStatsRowCountResolution = 10;

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_ERROR);
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_ERROR);

TestCreateTable(runtime, ++txId, "/MyRoot", R"_(
Name: "Table"
Columns { Name: "key" Type: "Decimal(35, 10)"}
Columns { Name: "decimal_value" Type: "Decimal(2, 1)"}
Columns { Name: "string_value" Type: "Utf8"}
KeyColumnNames: ["key"]
)_");
env.TestWaitNotification(runtime, txId);

const std::pair<ui64, ui64> decimalValue = NYql::NDecimal::MakePair(
NYql::NDecimal::FromString("32.1", 2, 1));
TString stringValue(1000, 'A');

for (ui64 key = 0; key < 1000; ++key) {
const std::pair<ui64, ui64> decimalKey = NYql::NDecimal::MakePair(
NYql::NDecimal::FromString(Sprintf("%d.123456789", key * 1'000'000), 35, 10));
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2, 3},
{TCell::Make<std::pair<ui64, ui64>>(decimalKey)},
{TCell::Make<std::pair<ui64, ui64>>(decimalValue), TCell(stringValue)});
}

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{NLs::PartitionCount(1)});

TestAlterTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
PartitionConfig {
PartitioningPolicy {
MinPartitionsCount: 100
MaxPartitionsCount: 100
SizeToSplit: 1
}
}
)");
env.TestWaitNotification(runtime, txId);

WaitForTableSplit(runtime, "/MyRoot/Table");
}

Y_UNIT_TEST(SplitShardsWhithPgKey) {
Y_UNIT_TEST(SplitShardsWithPgKey) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
Expand All @@ -185,7 +237,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
)");
env.TestWaitNotification(runtime, txId);

TString valueString = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
TString valueString(1000, 'A');;
for (ui64 key = 0; key < 1000; ++key) {
auto pgKey = NPg::PgNativeBinaryFromNativeText(ToString(key * 1'000'000), NPg::TypeDescFromPgTypeName("pgint8")).Str;
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell(pgKey)}, {TCell(valueString)});
Expand All @@ -206,31 +258,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
)");
env.TestWaitNotification(runtime, txId);

while (true) {
TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvGetTableStatsResult::EventType);

WaitForSuppressed(runtime, suppressed, 1, prevObserver);
for (auto &msg : suppressed) {
runtime.Send(msg.Release());
}
suppressed.clear();

bool itIsEnough = false;

NLs::TCheckFunc checkPartitionCount = [&] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
if (record.GetPathDescription().TablePartitionsSize() >= 10) {
itIsEnough = true;
}
};

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{checkPartitionCount});

if (itIsEnough) {
return;
}
}
WaitForTableSplit(runtime, "/MyRoot/Table");
}

Y_UNIT_TEST(Merge1KShards) {
Expand Down

0 comments on commit b1f11b4

Please sign in to comment.