Skip to content

Commit

Permalink
add stats test for internal column table from tables store (ydb-platf…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Sep 10, 2024
1 parent 0c46900 commit 390afc6
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
for (auto&& i : batch->schema()->fields()) {
AFL_VERIFY(GetIndexInfo().HasColumnName(i->name()));
if (!dstSchema->GetFieldByName(i->name())->Equals(i)) {
return TConclusionStatus::Fail("not equal field types for column '" + i->name() + "'");
return TConclusionStatus::Fail("not equal field types for column '" + i->name() + "': " + i->ToString() + " vs " +
dstSchema->GetFieldByName(i->name())->ToString());
}
if (GetIndexInfo().IsNullableVerified(i->name())) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
namespace NKikimr::NOlap {

void TBuildBatchesTask::ReplyError(const TString& message) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "cannot build batch for insert")("reason", message)("data", WriteData.GetWriteMeta().GetLongTxIdOptional());
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(*writeDataPtr) });
auto result = NColumnShard::TEvPrivate::TEvWriteBlobsResult::Error(NKikimrProto::EReplyStatus::CORRUPTED, std::move(buffer), message);
Expand Down
16 changes: 10 additions & 6 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,26 @@ bool WriteDataImpl(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shar
}

bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds, const NEvWrite::EModificationType mType) {
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds, const NEvWrite::EModificationType mType,
const std::set<std::string>& notNullColumns) {
NLongTxService::TLongTxId longTxId;
UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
return WriteDataImpl(runtime, sender, shardId, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds, mType);

return WriteDataImpl(
runtime, sender, shardId, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema, notNullColumns), writeIds, mType);
}

bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, bool waitResult, std::vector<ui64>* writeIds, const NEvWrite::EModificationType mType) {
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, bool waitResult, std::vector<ui64>* writeIds,
const NEvWrite::EModificationType mType, const std::set<std::string>& notNullColumns) {
NLongTxService::TLongTxId longTxId;
UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
if (writeIds) {
return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds, mType);
return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data,
NArrow::MakeArrowSchema(ydbSchema, notNullColumns), writeIds, mType);
}
std::vector<ui64> ids;
return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), waitResult ? &ids : nullptr, mType);
return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data,
NArrow::MakeArrowSchema(ydbSchema, notNullColumns), waitResult ? &ids : nullptr, mType);
}

std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,12 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);

bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds, const NEvWrite::EModificationType mType = NEvWrite::EModificationType::Upsert);
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds,
const NEvWrite::EModificationType mType = NEvWrite::EModificationType::Upsert, const std::set<std::string>& notNullColumns = {});

bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, bool waitResult = true, std::vector<ui64>* writeIds = nullptr, const NEvWrite::EModificationType mType = NEvWrite::EModificationType::Upsert);
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, bool waitResult = true, std::vector<ui64>* writeIds = nullptr,
const NEvWrite::EModificationType mType = NEvWrite::EModificationType::Upsert, const std::set<std::string>& notNullColumns = {});

std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
ui64 tableId, const ui64 writePartId, const TString& data,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/test_helper/helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class TTestColumn {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(NScheme::TTypeInfo, Type);
YDB_ACCESSOR_DEF(TString, StorageId);

public:
explicit TTestColumn(const TString& name, const NScheme::TTypeInfo& type)
: Name(name)
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr path
}
if (tableInfo->GetStats().TableStats.contains(pathId)) {
FillTableStats(*pathDescription, tableInfo->GetStats().TableStats.at(pathId));
} else {
FillTableStats(*pathDescription, TPartitionStats());
}
}
}
Expand Down
46 changes: 34 additions & 12 deletions ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>

Expand Down Expand Up @@ -634,17 +635,17 @@ Y_UNIT_TEST_SUITE(TOlap) {
env.TestWaitNotification(runtime, txId);
}

// TODO: AlterTiers
// negatives for store: disallow alters
// negatives for table: wrong tiers count, wrong tiers, wrong eviction column, wrong eviction values,
// different TTL columns in tiers
#if 0
Y_UNIT_TEST(StoreStats) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
runtime.UpdateCurrentTime(TInstant::Now() - TDuration::Seconds(600));

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

// disable stats batching
auto& appData = runtime.GetAppData();
appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
Expand Down Expand Up @@ -690,6 +691,16 @@ Y_UNIT_TEST_SUITE(TOlap) {
UNIT_ASSERT(shardId);
UNIT_ASSERT(pathId);
UNIT_ASSERT(planStep);
{
auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/OlapStore/ColumnTable", true, true);
Cerr << description.DebugString() << Endl;
auto& tabletStats = description.GetPathDescription().GetTableStats();

UNIT_ASSERT(description.GetPathDescription().HasTableStats());
UNIT_ASSERT_EQUAL(tabletStats.GetRowCount(), 0);
UNIT_ASSERT_EQUAL(tabletStats.GetDataSize(), 0);
}


ui32 rowsInBatch = 100000;

Expand All @@ -702,7 +713,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
TSet<ui64> txIds;
for (ui32 i = 0; i < 10; ++i) {
std::vector<ui64> writeIds;
NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds);
NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert, { "timestamp" });
NTxUT::ProposeCommit(runtime, sender, shardId, ++txId, writeIds);
txIds.insert(txId);
}
Expand All @@ -714,16 +725,28 @@ Y_UNIT_TEST_SUITE(TOlap) {

// trigger periodic stats at shard (after timeout)
std::vector<ui64> writeIds;
NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds);
NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert, { "timestamp" });
NTxUT::ProposeCommit(runtime, sender, shardId, ++txId, writeIds);
NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, {txId});
}
csController->WaitIndexation(TDuration::Seconds(5));
{
auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/OlapStore", true, true);
auto& tabletStats = description.GetPathDescription().GetTableStats();

auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/OlapStore", true, true);
auto& tabletStats = description.GetPathDescription().GetTableStats();
UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0);
UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0);
}

{
auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/OlapStore/ColumnTable", true, true);
Cerr << description.DebugString() << Endl;
auto& tabletStats = description.GetPathDescription().GetTableStats();

UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0);
UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0);
}

UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0);
UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0);
#if 0
TestDropColumnTable(runtime, ++txId, "/MyRoot/OlapStore", "ColumnTable");
env.TestWaitNotification(runtime, txId);
Expand All @@ -738,5 +761,4 @@ Y_UNIT_TEST_SUITE(TOlap) {
TestLsPathId(runtime, 2, NLs::PathStringEqual(""));
#endif
}
#endif
}

0 comments on commit 390afc6

Please sign in to comment.