Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 committed Sep 17, 2024
1 parent c171453 commit 507c931
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 12 deletions.
4 changes: 2 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2564,8 +2564,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty();
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
const bool localComputeTasks = !DatashardTxs.empty() && !HasOlapTable;
const bool mayRunTasksLocally = !((HasExternalSources || HasDatashardSourceScan) && DatashardTxs.empty()) && !HasOlapTable;
const bool localComputeTasks = !DatashardTxs.empty();
const bool mayRunTasksLocally = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());

Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
Expand Down
98 changes: 90 additions & 8 deletions ydb/core/kqp/opt/kqp_opt_build_txs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,16 +560,20 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
}

if (!query.Effects().Empty()) {
auto tx = BuildTx(query.Effects().Ptr(), ctx, /* isPrecompute */ false);
if (!tx) {
return TStatus::Error;
}
auto collectedEffects = CollectEffects(query.Effects(), ctx);

if (!CheckEffectsTx(tx.Cast(), query, ctx)) {
return TStatus::Error;
}
for (auto& effects : collectedEffects) {
auto tx = BuildTx(effects.Ptr(), ctx, /* isPrecompute */ false);
if (!tx) {
return TStatus::Error;
}

BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
if (!CheckEffectsTx(tx.Cast(), query, ctx)) {
return TStatus::Error;
}

BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
}
}

return TStatus::Ok;
Expand All @@ -581,6 +585,84 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
}

private:
TVector<TExprList> CollectEffects(const TExprList& list, TExprContext& ctx) {
struct TEffectsInfo {
enum class EType {
KQP_EFFECT,
KQP_SINK,
EXTERNAL_SINK,
};

EType Type;
THashSet<TStringBuf> TablesPathIds;
TVector<TExprNode::TPtr> Exprs;
};
TVector<TEffectsInfo> effectsInfos;

for (const auto& expr : list) {
if (auto sinkEffect = expr.Maybe<TKqpSinkEffect>()) {
const size_t sinkIndex = FromString(TStringBuf(sinkEffect.Cast().SinkIndex()));
const auto stage = sinkEffect.Cast().Stage().Maybe<TDqStageBase>();
YQL_ENSURE(stage);
YQL_ENSURE(stage.Cast().Outputs());
const auto outputs = stage.Cast().Outputs().Cast();
YQL_ENSURE(sinkIndex < outputs.Size());
const auto sink = outputs.Item(sinkIndex).Maybe<TDqSink>();
YQL_ENSURE(sink);

const auto sinkSettings = sink.Cast().Settings().Maybe<TKqpTableSinkSettings>();
if (!sinkSettings) {
// External writes always use their own physical transaction.
effectsInfos.emplace_back();
effectsInfos.back().Type = TEffectsInfo::EType::EXTERNAL_SINK;
effectsInfos.back().Exprs.push_back(expr.Ptr());
} else {
// Two table sinks can't be executed in one physical transaction if they write into one table.
const TStringBuf tablePathId = sinkSettings.Cast().Table().PathId().Value();

auto it = std::find_if(
std::begin(effectsInfos),
std::end(effectsInfos),
[&tablePathId](const auto& effectsInfo) {
return effectsInfo.Type == TEffectsInfo::EType::KQP_SINK
&& !effectsInfo.TablesPathIds.contains(tablePathId);
});
if (it == std::end(effectsInfos)) {
effectsInfos.emplace_back();
it = std::prev(std::end(effectsInfos));
it->Type = TEffectsInfo::EType::KQP_SINK;
}
it->TablesPathIds.insert(tablePathId);
it->Exprs.push_back(expr.Ptr());
}
} else {
// Table effects are executed all in one physical transaction.
auto it = std::find_if(
std::begin(effectsInfos),
std::end(effectsInfos),
[](const auto& effectsInfo) { return effectsInfo.Type == TEffectsInfo::EType::KQP_EFFECT; });
if (it == std::end(effectsInfos)) {
effectsInfos.emplace_back();
it = std::prev(std::end(effectsInfos));
it->Type = TEffectsInfo::EType::KQP_EFFECT;
}
it->Exprs.push_back(expr.Ptr());
}
}

TVector<TExprList> results;

for (const auto& effects : effectsInfos) {
auto builder = Build<TExprList>(ctx, list.Pos());
for (const auto& expr : effects.Exprs) {
builder.Add(expr);
}
results.push_back(builder.Done());
}

return results;
}

bool HasTableEffects(const TKqlQuery& query) const {
for (const TExprBase& effect : query.Effects()) {
if (auto maybeSinkEffect = effect.Maybe<TKqpSinkEffect>()) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
return;
}

Cerr << "COMPILED " << QueryState->CompileResult->PreparedQuery->GetPhysicalQuery().GetQueryAst() << Endl;

Become(&TKqpSessionActor::ExecuteState);

QueryState->TxCtx->OnBeginQuery();
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3283,9 +3283,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
{
auto result = client.ExecuteQuery(R"(
UPSERT INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, "test", 13);
(1u, "test1", 10), (2u, "test2", 11);
REPLACE INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES
(3u, "test3", 12), (4u, "test", 13);
UPSERT INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES
(10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, "test", 13);
(10u, "test1", 10), (20u, "test2", 11);
REPLACE INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES
(30u, "test3", 12), (40u, "test", 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
Expand Down

0 comments on commit 507c931

Please sign in to comment.