diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 044d4c9a7f2b..b3c919ed67b6 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -18,6 +18,7 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.ReadHuge ydb/core/keyvalue/ut_trace TKeyValueTracingTest.ReadSmall ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteHuge ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall +ydb/core/kqp/ut/cost KqpCost.OlapWriteRow ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertViaLegacyScripting-Streaming diff --git a/ydb/core/kqp/common/buffer/events.h b/ydb/core/kqp/common/buffer/events.h index 291de670246a..0dad898ff33b 100644 --- a/ydb/core/kqp/common/buffer/events.h +++ b/ydb/core/kqp/common/buffer/events.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -28,6 +29,10 @@ struct TEvFlush : public TEventLocal }; struct TEvResult : public TEventLocal { + TEvResult() = default; + TEvResult(NYql::NDqProto::TDqTaskStats&& stats) : Stats(std::move(stats)) {} + + std::optional Stats; }; struct TEvError : public TEventLocal { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 27b0b948409e..cc04706e23ff 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -332,7 +332,12 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Stats) { + if (Stats) { + Stats->AddBufferStats(std::move(*ev->Get()->Stats)); + } + } MakeResponseAndPassAway(); } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 177fd8567ff0..1a75bdde1563 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -890,6 +890,26 @@ void TQueryExecutionStats::AddDatashardStats(NKikimrQueryStats::TTxStats&& txSta } } +void TQueryExecutionStats::AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskStats) { + for (auto& table : taskStats.GetTables()) { + NYql::NDqProto::TDqTableStats* tableAggr = nullptr; + if (auto it = TableStats.find(table.GetTablePath()); it != TableStats.end()) { + tableAggr = it->second; + } else { + tableAggr = Result->AddTables(); + tableAggr->SetTablePath(table.GetTablePath()); + TableStats.emplace(table.GetTablePath(), tableAggr); + } + + tableAggr->SetReadRows(tableAggr->GetReadRows() + table.GetReadRows()); + tableAggr->SetReadBytes(tableAggr->GetReadBytes() + table.GetReadBytes()); + tableAggr->SetWriteRows(tableAggr->GetWriteRows() + table.GetWriteRows()); + tableAggr->SetWriteBytes(tableAggr->GetWriteBytes() + table.GetWriteBytes()); + tableAggr->SetEraseRows(tableAggr->GetEraseRows() + table.GetEraseRows()); + tableAggr->SetAffectedPartitions(table.GetAffectedPartitions()); + } +} + void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats) { Y_ASSERT(stats.GetTasks().size() == 1); const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index 6a7f9d94e821..42f5e6c181cf 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -200,6 +200,7 @@ struct TQueryExecutionStats { TDuration collectLongTaskStatsTimeout = TDuration::Max() ); void AddDatashardStats(NKikimrQueryStats::TTxStats&& txStats); + void AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskStats); void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats); void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats); diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index a6d7759f5bc8..5ad7afe89d32 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,17 @@ struct IKqpTableWriterCallbacks { virtual void OnError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues) = 0; }; +struct TKqpTableWriterStatistics { + ui64 ReadRows = 0; + ui64 ReadBytes = 0; + ui64 WriteRows = 0; + ui64 WriteBytes = 0; + ui64 EraseRows = 0; + ui64 EraseBytes = 0; + + THashSet AffectedPartitions; +}; + class TKqpTableWriteActor : public TActorBootstrapped { using TBase = TActorBootstrapped; @@ -492,8 +504,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { return builder; }() << ", Cookie=" << ev->Cookie); - - + UpdateStats(ev->Get()->Record.GetTxStats()); switch (ev->Get()->GetStatus()) { case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: { @@ -955,6 +966,56 @@ class TKqpTableWriteActor : public TActorBootstrapped { Send(this->SelfId(), new TEvPrivate::TEvTerminate{}); } + void UpdateStats(const NKikimrQueryStats::TTxStats& txStats) { + for (const auto& tableAccessStats : txStats.GetTableAccessStats()) { + YQL_ENSURE(tableAccessStats.GetTableInfo().GetPathId() == TableId.PathId.LocalPathId); + Stats.ReadRows += tableAccessStats.GetSelectRow().GetRows(); + Stats.ReadRows += tableAccessStats.GetSelectRange().GetRows(); + Stats.ReadBytes += tableAccessStats.GetSelectRow().GetBytes(); + Stats.ReadBytes += tableAccessStats.GetSelectRange().GetBytes(); + Stats.WriteRows += tableAccessStats.GetUpdateRow().GetRows(); + Stats.WriteBytes += tableAccessStats.GetUpdateRow().GetBytes(); + Stats.EraseRows += tableAccessStats.GetEraseRow().GetRows(); + Stats.EraseBytes += tableAccessStats.GetEraseRow().GetRows(); + } + + for (const auto& perShardStats : txStats.GetPerShardStats()) { + Stats.AffectedPartitions.insert(perShardStats.GetShardId()); + } + } + + void FillStats(NYql::NDqProto::TDqTaskStats* stats) { + NYql::NDqProto::TDqTableStats* tableStats = nullptr; + for (size_t i = 0; i < stats->TablesSize(); ++i) { + auto* table = stats->MutableTables(i); + if (table->GetTablePath() == TablePath) { + tableStats = table; + } + } + if (!tableStats) { + tableStats = stats->AddTables(); + tableStats->SetTablePath(TablePath); + } + + tableStats->SetReadRows(tableStats->GetReadRows() + Stats.ReadRows); + tableStats->SetReadBytes(tableStats->GetReadBytes() + Stats.ReadBytes); + tableStats->SetWriteRows(tableStats->GetWriteRows() + Stats.WriteRows); + tableStats->SetWriteBytes(tableStats->GetWriteBytes() + Stats.WriteBytes); + tableStats->SetEraseRows(tableStats->GetEraseRows() + Stats.EraseRows); + tableStats->SetEraseBytes(tableStats->GetEraseBytes() + Stats.EraseBytes); + + Stats.ReadRows = 0; + Stats.ReadBytes = 0; + Stats.WriteRows = 0; + Stats.WriteBytes = 0; + Stats.EraseRows = 0; + Stats.EraseBytes = 0; + + tableStats->SetAffectedPartitions( + tableStats->GetAffectedPartitions() + Stats.AffectedPartitions.size()); + Stats.AffectedPartitions.clear(); + } + NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false); TString LogPrefix; @@ -987,6 +1048,8 @@ class TKqpTableWriteActor : public TActorBootstrapped { TIntrusivePtr Counters; + TKqpTableWriterStatistics Stats; + NWilson::TSpan TableWriteActorSpan; NWilson::TSpan TableWriteActorStateSpan; }; @@ -1208,6 +1271,12 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu RuntimeError(message, statusCode, subIssues); } + void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats, bool last, const NYql::NDq::TDqMeteringStats*) override { + if (last && WriteTableActor) { + WriteTableActor->FillStats(stats); + } + } + TString LogPrefix; const NKikimrKqp::TKqpTableSinkSettings Settings; TWriteActorSettings MessageSettings; @@ -2018,7 +2087,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub if (TxManager->ConsumeCommitResult(shardId)) { CA_LOG_D("Committed"); State = EState::FINISHED; - Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); + Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{ + BuildStats() + }); ExecuterActorId = {}; Y_ABORT_UNLESS(GetTotalMemory() == 0); return; @@ -2035,7 +2106,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub "BufferWriteActorState::Writing", NWilson::EFlags::AUTO_END); CA_LOG_D("Flushed"); State = EState::WRITING; - Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); + Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{ + BuildStats() + }); ExecuterActorId = {}; Y_ABORT_UNLESS(GetTotalMemory() == 0); } @@ -2062,6 +2135,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub PassAway(); } + NYql::NDqProto::TDqTaskStats BuildStats() { + NYql::NDqProto::TDqTaskStats result; + for (const auto& [_, writeInfo] : WriteInfos) { + writeInfo.WriteTableActor->FillStats(&result); + } + return result; + } + private: TString LogPrefix; const TActorId SessionActorId; diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 87eeaff595ac..e12223f47ab6 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1,5 +1,6 @@ #include +#include #include #include @@ -11,11 +12,13 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; -static NKikimrConfig::TAppConfig GetAppConfig(bool scanSourceRead = false, bool streamLookup = true, bool streamLookupJoin = false) { +static NKikimrConfig::TAppConfig GetAppConfig(bool scanSourceRead = false, bool streamLookup = true, bool streamLookupJoin = false, bool enableOltpSink = false) { auto app = NKikimrConfig::TAppConfig(); app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(scanSourceRead); app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup); app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(streamLookupJoin); + app.MutableTableServiceConfig()->SetEnableOlapSink(true); + app.MutableTableServiceConfig()->SetEnableOltpSink(enableOltpSink); return app; } @@ -25,6 +28,11 @@ static NYdb::NTable::TExecDataQuerySettings GetDataQuerySettings() { return execSettings; } +static NYdb::NQuery::TExecuteQuerySettings GetQuerySettings() { + NYdb::NQuery::TExecuteQuerySettings execSettings; + execSettings.StatsMode(NYdb::NQuery::EStatsMode::Basic); + return execSettings; +} static void CreateSampleTables(TSession session) { UNIT_ASSERT(session.ExecuteSchemeQuery(R"( @@ -102,8 +110,11 @@ Y_UNIT_TEST_SUITE(KqpCost) { //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); } - Y_UNIT_TEST_TWIN(IndexLookup, StreamLookup) { - TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + Y_UNIT_TEST_QUAD(IndexLookup, StreamLookup, useSink) { + if (useSink && !StreamLookup) { + return; + } + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup, false, useSink)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -117,10 +128,9 @@ Y_UNIT_TEST_SUITE(KqpCost) { PRIMARY KEY (Key), INDEX Index GLOBAL ON (Fk) ); - )").GetValueSync(); - session.ExecuteDataQuery(R"( + auto prepare = session.ExecuteDataQuery(R"( REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES (1, 1, "Payload1", 100), (2, 2, "Payload2", 200), @@ -129,6 +139,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { (7, NULL, "Payload7", 700), (NULL, NULL, "Payload8", 800); )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(prepare.GetStatus(), EStatus::SUCCESS, prepare.GetIssues().ToString()); auto query = Q_(R"( SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1; @@ -167,8 +178,11 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8); } - Y_UNIT_TEST_TWIN(IndexLookupAtLeast8BytesInStorage, StreamLookup) { - TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + Y_UNIT_TEST_QUAD(IndexLookupAtLeast8BytesInStorage, StreamLookup, useSink) { + if (useSink && !StreamLookup) { + return; + } + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup, false, useSink)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -233,8 +247,11 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8); } - Y_UNIT_TEST_TWIN(IndexLookupAndTake, StreamLookup) { - TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + Y_UNIT_TEST_QUAD(IndexLookupAndTake, StreamLookup, useSink) { + if (useSink && !StreamLookup) { + return; + } + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup, false, useSink)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -299,7 +316,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { } Y_UNIT_TEST(PointLookup) { - TKikimrRunner kikimr(GetAppConfig(false, false)); + TKikimrRunner kikimr(GetAppConfig()); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -353,7 +370,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { } Y_UNIT_TEST_TWIN(IndexLookupJoin, StreamLookupJoin) { - TKikimrRunner kikimr(GetAppConfig(true, true, StreamLookupJoin)); + TKikimrRunner kikimr(GetAppConfig(true, true, StreamLookupJoin, false)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -486,6 +503,466 @@ Y_UNIT_TEST_SUITE(KqpCost) { CompareYson(Expected, res.ResultSetYson); } + void CreateTestTable(auto session, bool isColumn) { + UNIT_ASSERT(session.ExecuteQuery(std::format(R"( + --!syntax_v1 + CREATE TABLE `/Root/TestTable` ( + Group Uint32 not null, + Name String not null, + Amount Uint64, + Comment String, + PRIMARY KEY (Group, Name) + ) WITH ( + STORE = {}, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 + ); + + )", isColumn ? "COLUMN" : "ROW"), NYdb::NQuery::TTxControl::NoTx()).GetValueSync().IsSuccess()); + + auto result = session.ExecuteQuery(R"( + REPLACE INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES + (1u, "Anna", 3500ul, "None"), + (1u, "Paul", 300ul, "None"), + (2u, "Tony", 7200ul, "None"); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + Y_UNIT_TEST(OlapPointLookup) { + TKikimrRunner kikimr(GetAppConfig(false, false)); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, true); + + auto query = Q_(R"( + SELECT * FROM `/Root/TestTable` WHERE Group = 1u AND Name = "Anna"; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [[3500u];["None"]; + 1u;"Anna"] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.query_phases().size() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).reads().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).reads().bytes(), 36); + } + + Y_UNIT_TEST(OlapRange) { + TKikimrRunner kikimr(GetAppConfig()); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, true); + + auto query = Q_(R"( + SELECT * FROM `/Root/TestTable` WHERE Group < 2u ORDER BY Group, Name; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [[3500u];["None"];1u;"Anna"]; + [[300u];["None"];1u;"Paul"] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).reads().rows(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).reads().bytes(), 72); + } + + Y_UNIT_TEST(OlapRangeFullScan) { + TKikimrRunner kikimr(GetAppConfig()); + + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, true); + + auto query = Q_(R"( + SELECT * FROM `/Root/TestTable` WHERE Amount < 5000ul ORDER BY Group, Name LIMIT 1; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [[3500u];["None"];1u;"Anna"] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2); // Limit??? + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().bytes(), 72); + } + + Y_UNIT_TEST(OlapWriteRow) { + TKikimrRunner kikimr(GetAppConfig(false, false, false, true)); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, true); + + { + auto query = Q_(R"( + REPLACE INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES (1u, "Anna", 3500u, "None"); + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); + } + + { + auto query = Q_(R"( + UPSERT INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES (1u, "Anna", 3500u, "None"); + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); + } + + { + // INSERT EXISTS + auto query = Q_(R"( + INSERT INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES (1u, "Anna", 3500u, "None"); + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + // TODO: fix status? + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + // TODO: reads??? + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().bytes(), 8); + } + + { + // INSERT NEW + auto query = Q_(R"( + INSERT INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES (3u, "Anna", 3500u, "None"); + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); + } + + { + // UPDATE empty + auto query = Q_(R"( + UPDATE `/Root/TestTable` ON SELECT 4u AS Group, "Anna" AS Name, 4000u AS Amount, "None" AS Comment; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + // No reads & no writes + for (int phase = 0; phase < stats.query_phases_size(); ++phase) { + if (stats.query_phases(phase).table_access_size() > 0) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access_size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 0); + } + } + } + + { + // UPDATE + auto query = Q_(R"( + UPDATE `/Root/TestTable` ON SELECT 3u AS Group, "Anna" AS Name, 4000u AS Amount, "None" AS Comment; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); + // TODO: reads??? + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().bytes(), 8); + } + + { + // DELETE empty + auto query = Q_(R"( + DELETE FROM `/Root/TestTable` ON SELECT 4u AS Group, "Anna" AS Name; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).deletes().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).deletes().bytes(), 0); + } + + { + // DELETE + auto query = Q_(R"( + DELETE FROM `/Root/TestTable` ON SELECT 3u AS Group, "Anna" AS Name; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).deletes().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).deletes().bytes(), 0); + } + } + + Y_UNIT_TEST_TWIN(OltpWriteRow, isSink) { + TKikimrRunner kikimr(GetAppConfig(false, false, false, isSink)); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, false); + + { + auto query = Q_(R"( + REPLACE INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES (1u, "Anna", 3500u, "None"); + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); + } + + { + auto query = Q_(R"( + UPSERT INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES (1u, "Anna", 3500u, "None"); + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); + } + + { + // INSERT EXISTS + auto query = Q_(R"( + INSERT INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES (1u, "Anna", 3500u, "None"); + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + if (isSink) { // TODO: fix status? + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); + } else { + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); + } + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + if (isSink) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access_size(), 0); + // TODO: reads??? + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().bytes(), 8); + } + } + + { + // INSERT NEW + auto query = Q_(R"( + INSERT INTO `/Root/TestTable` (Group, Name, Amount, Comment) VALUES (3u, "Anna", 3500u, "None"); + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); + } + + { + // UPDATE empty + auto query = Q_(R"( + UPDATE `/Root/TestTable` ON SELECT 4u AS Group, "Anna" AS Name, 4000u AS Amount, "None" AS Comment; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + // No reads & no writes + for (int phase = 0; phase < stats.query_phases_size(); ++phase) { + if (stats.query_phases(phase).table_access_size() > 0) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access_size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 0); + } + } + } + + { + // UPDATE + auto query = Q_(R"( + UPDATE `/Root/TestTable` ON SELECT 3u AS Group, "Anna" AS Name, 4000u AS Amount, "None" AS Comment; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); + if (!isSink) { + // TODO: reads??? + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().bytes(), 8); + } + } + + { + // DELETE empty + auto query = Q_(R"( + DELETE FROM `/Root/TestTable` ON SELECT 4u AS Group, "Anna" AS Name; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).deletes().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).deletes().bytes(), 0); + } + + { + // DELETE + auto query = Q_(R"( + DELETE FROM `/Root/TestTable` ON SELECT 3u AS Group, "Anna" AS Name; + )"); + + auto txControl = NYdb::NQuery::TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + size_t phase = stats.query_phases_size() - 1; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).deletes().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).deletes().bytes(), 0); + } + } + } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 40f3f7289176..079e990758a7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -198,6 +198,8 @@ struct IDqComputeActorAsyncOutput { virtual TMaybe ExtraData() { return {}; } + virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */, const NYql::NDq::TDqMeteringStats*) { } + virtual void PassAway() = 0; // The same signature as IActor::PassAway() virtual ~IDqComputeActorAsyncOutput() = default; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 57c4d781768e..d08d79c32e7e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1801,6 +1801,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped egressRows += egressStats.Rows ? egressStats.Rows : pushStats.Rows; // p.s. sink == sinkInfo.Buffer } + + if (auto* source = sinkInfo.AsyncOutput) { + source->FillExtraStats(protoTask, last, GetMeteringStats()); + } } protoTask->SetFinishTimeMs(finishTimeMs);