From 205c765a56a9f7ca4d026c1bbf7fa021ac94bc7c Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Fri, 7 Jun 2024 23:00:33 +0300 Subject: [PATCH] fix when limit per node logic assigns incorrect shard hints in single shard scenario (#5339) --- .../kqp/executer_actor/kqp_executer_impl.h | 45 +++++++++++++------ ydb/core/kqp/runtime/kqp_read_actor.cpp | 7 ++- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index f0706f346a95..3c407523c6de 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -76,6 +76,12 @@ inline bool IsDebugLogEnabled() { TlsActivationContext->LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::KQP_EXECUTER); } +struct TShardRangesWithShardId { + TMaybe ShardId; + const TShardKeyRanges* Ranges; +}; + + TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, const NKikimrKqp::TRlPath& path); @@ -986,21 +992,32 @@ class TKqpExecuterBase : public TActorBootstrapped { } } - TVector> DistributeShardsToTasks(TVector shardsRanges, const size_t tasksCount, const TVector& keyTypes) { - std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardKeyRanges* lhs, const TShardKeyRanges* rhs) { + TVector> DistributeShardsToTasks(TVector shardsRanges, const size_t tasksCount, const TVector& keyTypes) { + if (IsDebugLogEnabled()) { + TStringBuilder sb; + sb << "Distrubiting shards to tasks: ["; + for(size_t i = 0; i < shardsRanges.size(); i++) { + sb << "# " << i << ": " << shardsRanges[i].Ranges->ToString(keyTypes, *AppData()->TypeRegistry); + } + + sb << " ]."; + LOG_D(sb); + } + + std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) { // Special case for infinity - if (lhs->GetRightBorder().first->GetCells().empty() || rhs->GetRightBorder().first->GetCells().empty()) { - YQL_ENSURE(!lhs->GetRightBorder().first->GetCells().empty() || !rhs->GetRightBorder().first->GetCells().empty()); - return rhs->GetRightBorder().first->GetCells().empty(); + if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) { + YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty()); + return rhs.Ranges->GetRightBorder().first->GetCells().empty(); } return CompareTypedCellVectors( - lhs->GetRightBorder().first->GetCells().data(), - rhs->GetRightBorder().first->GetCells().data(), + lhs.Ranges->GetRightBorder().first->GetCells().data(), + rhs.Ranges->GetRightBorder().first->GetCells().data(), keyTypes.data(), keyTypes.size()) < 0; }); // One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle. - TVector> result(tasksCount); + TVector> result(tasksCount); size_t shardIndex = 0; for (size_t taskIndex = 0; taskIndex < tasksCount; ++taskIndex) { const size_t tasksLeft = tasksCount - taskIndex; @@ -1129,7 +1146,7 @@ class TKqpExecuterBase : public TActorBootstrapped { }; THashMap> nodeIdToTasks; - THashMap> nodeIdToShardKeyRanges; + THashMap> nodeIdToShardKeyRanges; auto addPartiton = [&]( ui64 taskLocation, @@ -1155,11 +1172,11 @@ class TKqpExecuterBase : public TActorBootstrapped { const auto maxScanTasksPerNode = GetScanTasksPerNode(stageInfo, /* isOlapScan */ false, *nodeId); auto& nodeTasks = nodeIdToTasks[*nodeId]; if (nodeTasks.size() < maxScanTasksPerNode) { - const auto& task = createNewTask(nodeId, taskLocation, shardId, maxInFlightShards); + const auto& task = createNewTask(nodeId, taskLocation, {}, maxInFlightShards); nodeTasks.push_back(task.Id); } - nodeIdToShardKeyRanges[*nodeId].push_back(&*shardInfo.KeyReadRanges); + nodeIdToShardKeyRanges[*nodeId].push_back(TShardRangesWithShardId{shardId, &*shardInfo.KeyReadRanges}); } else { auto& task = createNewTask(nodeId, taskLocation, shardId, maxInFlightShards); const auto& stageSource = stage.GetSources(0); @@ -1186,12 +1203,12 @@ class TKqpExecuterBase : public TActorBootstrapped { const auto& shardsRangesForTask = rangesDistribution[taskIndex]; - if (shardsRangesForTask.size() > 1) { - settings->ClearShardIdHint(); + if (shardsRangesForTask.size() == 1 && shardsRangesForTask[0].ShardId) { + settings->SetShardIdHint(*shardsRangesForTask[0].ShardId); } for (const auto& shardRanges : shardsRangesForTask) { - shardRanges->SerializeTo(settings); + shardRanges.Ranges->SerializeTo(settings); } } } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 1b69097b7b48..75e09b43c8f7 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -308,8 +308,11 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq sb << ", "; } } - sb << "], " - << ", RetryAttempt: " << RetryAttempt << ", ResolveAttempt: " << ResolveAttempt << " }"; + sb << "], Points: ["; + for(size_t i = 0; i < Points.size(); ++i) { + sb << "# " << i << ": " << DebugPrintPoint(keyTypes, Points[i].GetCells(), *AppData()->TypeRegistry); + } + sb << "], RetryAttempt: " << RetryAttempt << ", ResolveAttempt: " << ResolveAttempt << " }"; return sb; }