diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 32ac91b200d9..a755600c98ce 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -67,7 +67,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedMutableTables()) { @@ -81,9 +81,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedSetTablePath(StreamLookupWorker->GetTablePath()); } + ui64 consumedRows = mstats ? mstats->Inputs[InputIndex]->RowsConsumed : ReadRowsCount; + ui64 consumedBytes = mstats ? mstats->Inputs[InputIndex]->BytesConsumed : ReadBytesCount; + // TODO: use evread statistics after KIKIMR-16924 - tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount); - tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount); + tableStats->SetReadRows(tableStats->GetReadRows() + consumedRows); + tableStats->SetReadBytes(tableStats->GetReadBytes() + consumedBytes); tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size()); NKqpProto::TKqpTableExtraStats tableExtraStats; @@ -148,7 +151,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped { - explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false) + explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false) : ReadId(readId) , LastSeqNo(lastSeqNo) , InstantStart(instantStart) { @@ -259,7 +262,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedGetTablePath()); if (ev->Get()->Request->ErrorCount > 0) { - TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: " + TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: " << StreamLookupWorker->GetTablePath(); LookupActorStateSpan.EndError(errorMsg); @@ -419,7 +422,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedGet()->ReadId); YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId); auto& read = readIt->second; - + if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) { if (ev->Get()->InstantStart) { read.SetFinished(); @@ -566,7 +569,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped{})); Counters->IteratorsShardResolve->Inc(); - LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(), + LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(), "WaitForShardsResolve", NWilson::EFlags::AUTO_END); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {})); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index efd1c6fd72c4..e74ae333440f 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -349,6 +349,7 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); i64 rowSize = 0; + i64 storageRowSize = 0; for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) { const auto& column = Columns[colIndex]; if (IsSystemColumn(column.Name)) { @@ -356,6 +357,7 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { rowSize += sizeof(NUdf::TUnboxedValue); } else { YQL_ENSURE(resultColIndex < resultRow.size()); + storageRowSize += resultRow[resultColIndex].Size(); rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType); rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes; ++resultColIndex; @@ -370,10 +372,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { batch.push_back(std::move(row)); + storageRowSize = std::max(storageRowSize, (i64)8); + resultStats.ReadRowsCount += 1; - resultStats.ReadBytesCount += rowSize; + resultStats.ReadBytesCount += storageRowSize; resultStats.ResultRowsCount += 1; - resultStats.ResultBytesCount += rowSize; + resultStats.ResultBytesCount += storageRowSize; } if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 9e484118a66c..fb6de4ec2808 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -43,6 +43,202 @@ Y_UNIT_TEST_SUITE(KqpCost) { //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); } + Y_UNIT_TEST_TWIN(IndexLookup, StreamLookup) { + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/SecondaryKeys` ( + Key Int32, + Fk Int32, + Value String, + ValueInt Int32, + PRIMARY KEY (Key), + INDEX Index GLOBAL ON (Fk) + ); + + )").GetValueSync(); + + session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES + (1, 1, "Payload1", 100), + (2, 2, "Payload2", 200), + (5, 5, "Payload5", 500), + (NULL, 6, "Payload6", 600), + (7, NULL, "Payload7", 700), + (NULL, NULL, "Payload8", 800); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + + auto query = Q_(R"( + SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1; + )"); + + auto txControl = TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [["Payload1"]] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + std::unordered_map> readsByTable; + for(const auto& queryPhase : stats.query_phases()) { + for(const auto& tableAccess: queryPhase.table_access()) { + auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0)); + it->second.first += tableAccess.reads().rows(); + it->second.second += tableAccess.reads().bytes(); + } + } + + for(const auto& [name, rowsAndBytes]: readsByTable) { + Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl; + } + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8); + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8); + } + + Y_UNIT_TEST_TWIN(IndexLookupAtLeast8BytesInStorage, StreamLookup) { + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/SecondaryKeys` ( + Key Int32, + Fk Int32, + Value String, + ValueInt Int32, + PRIMARY KEY (Key), + INDEX Index GLOBAL ON (Fk) + ); + + )").GetValueSync(); + + session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES + (1, 1, "Payload1", 100), + (2, 2, "Payload2", 200), + (5, 5, "Payload5", 500), + (NULL, 6, "Payload6", 600), + (7, NULL, "Payload7", 700), + (NULL, NULL, "Payload8", 800); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + + auto query = Q_(R"( + SELECT ValueInt FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1; + )"); + + auto txControl = TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [[100]] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + std::unordered_map> readsByTable; + for(const auto& queryPhase : stats.query_phases()) { + for(const auto& tableAccess: queryPhase.table_access()) { + auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0)); + it->second.first += tableAccess.reads().rows(); + it->second.second += tableAccess.reads().bytes(); + } + } + + for(const auto& [name, rowsAndBytes]: readsByTable) { + Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl; + } + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1); + // 4 bytes is unexpected, because datashards has 8 bytes per row in storage. + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8); + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8); + } + + Y_UNIT_TEST_TWIN(IndexLookupAndTake, StreamLookup) { + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/SecondaryKeys` ( + Key Int32, + Fk Int32, + Value String, + ValueInt Int32, + PRIMARY KEY (Key), + INDEX Index GLOBAL ON (Fk) + ); + + )").GetValueSync(); + + session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES + (1, 1, "Payload1", 100), + (2, 2, "Payload2", 200), + (5, 5, "Payload5", 500), + (NULL, 6, "Payload6", 600), + (7, NULL, "Payload7", 700), + (NULL, NULL, "Payload8", 800); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + + auto query = Q_(R"( + SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk >= 1 and Fk <= 2 AND StartsWith(Value, "Payload") LIMIT 1; + )"); + + auto txControl = TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [["Payload1"]] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + std::unordered_map> readsByTable; + for(const auto& queryPhase : stats.query_phases()) { + for(const auto& tableAccess: queryPhase.table_access()) { + auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0)); + it->second.first += tableAccess.reads().rows(); + it->second.second += tableAccess.reads().bytes(); + } + } + + for(const auto& [name, rowsAndBytes]: readsByTable) { + Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl; + } + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8); + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 2); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 16); + } + Y_UNIT_TEST(PointLookup) { TKikimrRunner kikimr(GetAppConfig(false, false)); auto db = kikimr.GetTableClient();