Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix when limit per node logic assigns incorrect shard hints in single shard scenario #5362

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ inline bool IsDebugLogEnabled() {
TlsActivationContext->LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::KQP_EXECUTER);
}

struct TShardRangesWithShardId {
TMaybe<ui64> ShardId;
const TShardKeyRanges* Ranges;
};


TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
const NKikimrKqp::TRlPath& path);

Expand Down Expand Up @@ -986,21 +992,32 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}
}

TVector<TVector<const TShardKeyRanges*>> DistributeShardsToTasks(TVector<const TShardKeyRanges*> shardsRanges, const size_t tasksCount, const TVector<NScheme::TTypeInfo>& keyTypes) {
std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardKeyRanges* lhs, const TShardKeyRanges* rhs) {
TVector<TVector<TShardRangesWithShardId>> DistributeShardsToTasks(TVector<TShardRangesWithShardId> shardsRanges, const size_t tasksCount, const TVector<NScheme::TTypeInfo>& keyTypes) {
if (IsDebugLogEnabled()) {
TStringBuilder sb;
sb << "Distrubiting shards to tasks: [";
for(size_t i = 0; i < shardsRanges.size(); i++) {
sb << "# " << i << ": " << shardsRanges[i].Ranges->ToString(keyTypes, *AppData()->TypeRegistry);
}

sb << " ].";
LOG_D(sb);
}

std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
// Special case for infinity
if (lhs->GetRightBorder().first->GetCells().empty() || rhs->GetRightBorder().first->GetCells().empty()) {
YQL_ENSURE(!lhs->GetRightBorder().first->GetCells().empty() || !rhs->GetRightBorder().first->GetCells().empty());
return rhs->GetRightBorder().first->GetCells().empty();
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty());
return rhs.Ranges->GetRightBorder().first->GetCells().empty();
}
return CompareTypedCellVectors(
lhs->GetRightBorder().first->GetCells().data(),
rhs->GetRightBorder().first->GetCells().data(),
lhs.Ranges->GetRightBorder().first->GetCells().data(),
rhs.Ranges->GetRightBorder().first->GetCells().data(),
keyTypes.data(), keyTypes.size()) < 0;
});

// One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle.
TVector<TVector<const TShardKeyRanges*>> result(tasksCount);
TVector<TVector<TShardRangesWithShardId>> result(tasksCount);
size_t shardIndex = 0;
for (size_t taskIndex = 0; taskIndex < tasksCount; ++taskIndex) {
const size_t tasksLeft = tasksCount - taskIndex;
Expand Down Expand Up @@ -1129,7 +1146,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
};

THashMap<ui64, TVector<ui64>> nodeIdToTasks;
THashMap<ui64, TVector<const TShardKeyRanges*>> nodeIdToShardKeyRanges;
THashMap<ui64, TVector<TShardRangesWithShardId>> nodeIdToShardKeyRanges;

auto addPartiton = [&](
ui64 taskLocation,
Expand All @@ -1155,11 +1172,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
const auto maxScanTasksPerNode = GetScanTasksPerNode(stageInfo, /* isOlapScan */ false, *nodeId);
auto& nodeTasks = nodeIdToTasks[*nodeId];
if (nodeTasks.size() < maxScanTasksPerNode) {
const auto& task = createNewTask(nodeId, taskLocation, shardId, maxInFlightShards);
const auto& task = createNewTask(nodeId, taskLocation, {}, maxInFlightShards);
nodeTasks.push_back(task.Id);
}

nodeIdToShardKeyRanges[*nodeId].push_back(&*shardInfo.KeyReadRanges);
nodeIdToShardKeyRanges[*nodeId].push_back(TShardRangesWithShardId{shardId, &*shardInfo.KeyReadRanges});
} else {
auto& task = createNewTask(nodeId, taskLocation, shardId, maxInFlightShards);
const auto& stageSource = stage.GetSources(0);
Expand All @@ -1186,12 +1203,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

const auto& shardsRangesForTask = rangesDistribution[taskIndex];

if (shardsRangesForTask.size() > 1) {
settings->ClearShardIdHint();
if (shardsRangesForTask.size() == 1 && shardsRangesForTask[0].ShardId) {
settings->SetShardIdHint(*shardsRangesForTask[0].ShardId);
}

for (const auto& shardRanges : shardsRangesForTask) {
shardRanges->SerializeTo(settings);
shardRanges.Ranges->SerializeTo(settings);
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
sb << ", ";
}
}
sb << "], "
<< ", RetryAttempt: " << RetryAttempt << ", ResolveAttempt: " << ResolveAttempt << " }";
sb << "], Points: [";
for(size_t i = 0; i < Points.size(); ++i) {
sb << "# " << i << ": " << DebugPrintPoint(keyTypes, Points[i].GetCells(), *AppData()->TypeRegistry);
}
sb << "], RetryAttempt: " << RetryAttempt << ", ResolveAttempt: " << ResolveAttempt << " }";
return sb;
}

Expand Down
Loading