Skip to content

Commit

Permalink
fix when limit per node logic assigns incorrect shard hints in single…
Browse files Browse the repository at this point in the history
… shard scenario (#5362)
  • Loading branch information
gridnevvvit authored Jun 10, 2024
1 parent 99f0004 commit 9525b0f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
45 changes: 31 additions & 14 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ inline bool IsDebugLogEnabled() {
TlsActivationContext->LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::KQP_EXECUTER);
}

struct TShardRangesWithShardId {
TMaybe<ui64> ShardId;
const TShardKeyRanges* Ranges;
};


TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
const NKikimrKqp::TRlPath& path);

Expand Down Expand Up @@ -986,21 +992,32 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}
}

TVector<TVector<const TShardKeyRanges*>> DistributeShardsToTasks(TVector<const TShardKeyRanges*> shardsRanges, const size_t tasksCount, const TVector<NScheme::TTypeInfo>& keyTypes) {
std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardKeyRanges* lhs, const TShardKeyRanges* rhs) {
TVector<TVector<TShardRangesWithShardId>> DistributeShardsToTasks(TVector<TShardRangesWithShardId> shardsRanges, const size_t tasksCount, const TVector<NScheme::TTypeInfo>& keyTypes) {
if (IsDebugLogEnabled()) {
TStringBuilder sb;
sb << "Distrubiting shards to tasks: [";
for(size_t i = 0; i < shardsRanges.size(); i++) {
sb << "# " << i << ": " << shardsRanges[i].Ranges->ToString(keyTypes, *AppData()->TypeRegistry);
}

sb << " ].";
LOG_D(sb);
}

std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
// Special case for infinity
if (lhs->GetRightBorder().first->GetCells().empty() || rhs->GetRightBorder().first->GetCells().empty()) {
YQL_ENSURE(!lhs->GetRightBorder().first->GetCells().empty() || !rhs->GetRightBorder().first->GetCells().empty());
return rhs->GetRightBorder().first->GetCells().empty();
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty());
return rhs.Ranges->GetRightBorder().first->GetCells().empty();
}
return CompareTypedCellVectors(
lhs->GetRightBorder().first->GetCells().data(),
rhs->GetRightBorder().first->GetCells().data(),
lhs.Ranges->GetRightBorder().first->GetCells().data(),
rhs.Ranges->GetRightBorder().first->GetCells().data(),
keyTypes.data(), keyTypes.size()) < 0;
});

// One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle.
TVector<TVector<const TShardKeyRanges*>> result(tasksCount);
TVector<TVector<TShardRangesWithShardId>> result(tasksCount);
size_t shardIndex = 0;
for (size_t taskIndex = 0; taskIndex < tasksCount; ++taskIndex) {
const size_t tasksLeft = tasksCount - taskIndex;
Expand Down Expand Up @@ -1129,7 +1146,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
};

THashMap<ui64, TVector<ui64>> nodeIdToTasks;
THashMap<ui64, TVector<const TShardKeyRanges*>> nodeIdToShardKeyRanges;
THashMap<ui64, TVector<TShardRangesWithShardId>> nodeIdToShardKeyRanges;

auto addPartiton = [&](
ui64 taskLocation,
Expand All @@ -1155,11 +1172,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
const auto maxScanTasksPerNode = GetScanTasksPerNode(stageInfo, /* isOlapScan */ false, *nodeId);
auto& nodeTasks = nodeIdToTasks[*nodeId];
if (nodeTasks.size() < maxScanTasksPerNode) {
const auto& task = createNewTask(nodeId, taskLocation, shardId, maxInFlightShards);
const auto& task = createNewTask(nodeId, taskLocation, {}, maxInFlightShards);
nodeTasks.push_back(task.Id);
}

nodeIdToShardKeyRanges[*nodeId].push_back(&*shardInfo.KeyReadRanges);
nodeIdToShardKeyRanges[*nodeId].push_back(TShardRangesWithShardId{shardId, &*shardInfo.KeyReadRanges});
} else {
auto& task = createNewTask(nodeId, taskLocation, shardId, maxInFlightShards);
const auto& stageSource = stage.GetSources(0);
Expand All @@ -1186,12 +1203,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

const auto& shardsRangesForTask = rangesDistribution[taskIndex];

if (shardsRangesForTask.size() > 1) {
settings->ClearShardIdHint();
if (shardsRangesForTask.size() == 1 && shardsRangesForTask[0].ShardId) {
settings->SetShardIdHint(*shardsRangesForTask[0].ShardId);
}

for (const auto& shardRanges : shardsRangesForTask) {
shardRanges->SerializeTo(settings);
shardRanges.Ranges->SerializeTo(settings);
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
sb << ", ";
}
}
sb << "], "
<< ", RetryAttempt: " << RetryAttempt << ", ResolveAttempt: " << ResolveAttempt << " }";
sb << "], Points: [";
for(size_t i = 0; i < Points.size(); ++i) {
sb << "# " << i << ": " << DebugPrintPoint(keyTypes, Points[i].GetCells(), *AppData()->TypeRegistry);
}
sb << "], RetryAttempt: " << RetryAttempt << ", ResolveAttempt: " << ResolveAttempt << " }";
return sb;
}

Expand Down

0 comments on commit 9525b0f

Please sign in to comment.