Skip to content

Commit

Permalink
Fix sinks order (#9345)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 19, 2024
1 parent 2a0bfb0 commit 6e3fc43
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 21 deletions.
5 changes: 0 additions & 5 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@ ydb/core/kqp/ut/service [*/*]*
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
ydb/core/kqp/ut/tx KqpSinkMvcc.OltpNamedStatement
ydb/core/kqp/ut/tx KqpSinkMvcc.OlapNamedStatement
ydb/core/kqp/ut/tx KqpSinkMvcc.OltpMultiSinks
ydb/core/kqp/ut/tx KqpSinkMvcc.OlapMultiSinks
ydb/core/persqueue/ut [*/*]*
ydb/core/persqueue/ut TPQTest.*DirectRead*
Expand Down
106 changes: 94 additions & 12 deletions ydb/core/kqp/opt/kqp_opt_build_txs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,16 +560,20 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
}

if (!query.Effects().Empty()) {
auto tx = BuildTx(query.Effects().Ptr(), ctx, /* isPrecompute */ false);
if (!tx) {
return TStatus::Error;
}
auto collectedEffects = CollectEffects(query.Effects(), ctx);

if (!CheckEffectsTx(tx.Cast(), query, ctx)) {
return TStatus::Error;
}
for (auto& effects : collectedEffects) {
auto tx = BuildTx(effects.Ptr(), ctx, /* isPrecompute */ false);
if (!tx) {
return TStatus::Error;
}

BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
if (!CheckEffectsTx(tx.Cast(), effects, ctx)) {
return TStatus::Error;
}

BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
}
}

return TStatus::Ok;
Expand All @@ -581,8 +585,86 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
}

private:
bool HasTableEffects(const TKqlQuery& query) const {
for (const TExprBase& effect : query.Effects()) {
TVector<TExprList> CollectEffects(const TExprList& list, TExprContext& ctx) {
struct TEffectsInfo {
enum class EType {
KQP_EFFECT,
KQP_SINK,
EXTERNAL_SINK,
};

EType Type;
THashSet<TStringBuf> TablesPathIds;
TVector<TExprNode::TPtr> Exprs;
};
TVector<TEffectsInfo> effectsInfos;

for (const auto& expr : list) {
if (auto sinkEffect = expr.Maybe<TKqpSinkEffect>()) {
const size_t sinkIndex = FromString(TStringBuf(sinkEffect.Cast().SinkIndex()));
const auto stage = sinkEffect.Cast().Stage().Maybe<TDqStageBase>();
YQL_ENSURE(stage);
YQL_ENSURE(stage.Cast().Outputs());
const auto outputs = stage.Cast().Outputs().Cast();
YQL_ENSURE(sinkIndex < outputs.Size());
const auto sink = outputs.Item(sinkIndex).Maybe<TDqSink>();
YQL_ENSURE(sink);

const auto sinkSettings = sink.Cast().Settings().Maybe<TKqpTableSinkSettings>();
if (!sinkSettings) {
// External writes always use their own physical transaction.
effectsInfos.emplace_back();
effectsInfos.back().Type = TEffectsInfo::EType::EXTERNAL_SINK;
effectsInfos.back().Exprs.push_back(expr.Ptr());
} else {
// Two table sinks can't be executed in one physical transaction if they write into one table.
const TStringBuf tablePathId = sinkSettings.Cast().Table().PathId().Value();

auto it = std::find_if(
std::begin(effectsInfos),
std::end(effectsInfos),
[&tablePathId](const auto& effectsInfo) {
return effectsInfo.Type == TEffectsInfo::EType::KQP_SINK
&& !effectsInfo.TablesPathIds.contains(tablePathId);
});
if (it == std::end(effectsInfos)) {
effectsInfos.emplace_back();
it = std::prev(std::end(effectsInfos));
it->Type = TEffectsInfo::EType::KQP_SINK;
}
it->TablesPathIds.insert(tablePathId);
it->Exprs.push_back(expr.Ptr());
}
} else {
// Table effects are executed all in one physical transaction.
auto it = std::find_if(
std::begin(effectsInfos),
std::end(effectsInfos),
[](const auto& effectsInfo) { return effectsInfo.Type == TEffectsInfo::EType::KQP_EFFECT; });
if (it == std::end(effectsInfos)) {
effectsInfos.emplace_back();
it = std::prev(std::end(effectsInfos));
it->Type = TEffectsInfo::EType::KQP_EFFECT;
}
it->Exprs.push_back(expr.Ptr());
}
}

TVector<TExprList> results;

for (const auto& effects : effectsInfos) {
auto builder = Build<TExprList>(ctx, list.Pos());
for (const auto& expr : effects.Exprs) {
builder.Add(expr);
}
results.push_back(builder.Done());
}

return results;
}

bool HasTableEffects(const TExprList& effectsList) const {
for (const TExprBase& effect : effectsList) {
if (auto maybeSinkEffect = effect.Maybe<TKqpSinkEffect>()) {
// (KqpSinkEffect (DqStage (... ((DqSink '0 (DataSink '"kikimr") ...)))) '0)
auto sinkEffect = maybeSinkEffect.Cast();
Expand All @@ -608,7 +690,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
return false;
}

bool CheckEffectsTx(TKqpPhysicalTx tx, const TKqlQuery& query, TExprContext& ctx) const {
bool CheckEffectsTx(TKqpPhysicalTx tx, const TExprList& effectsList, TExprContext& ctx) const {
TMaybeNode<TExprBase> blackistedNode;
VisitExpr(tx.Ptr(), [&blackistedNode](const TExprNode::TPtr& exprNode) {
if (blackistedNode) {
Expand All @@ -630,7 +712,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
return true;
});

if (blackistedNode && HasTableEffects(query)) {
if (blackistedNode && HasTableEffects(effectsList)) {
ctx.AddError(TIssue(ctx.GetPosition(blackistedNode.Cast().Pos()), TStringBuilder()
<< "Callable not expected in effects tx: " << blackistedNode.Cast<TCallable>().CallableName()));
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,8 +1224,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto db = kikimr->GetQueryClient();
auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
resultFuture.Wait();
UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(resultFuture.GetValueSync().GetIssues().ToString(), "Callable not expected in effects tx: Unwrap");
UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
}
}

Expand Down
124 changes: 122 additions & 2 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3174,7 +3174,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1)));
}

/*{
{
auto result = client.ExecuteQuery(R"(
DELETE FROM `/Root/ColumnShard` ON SELECT * FROM `/Root/DataShard` WHERE Col1 > 9;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
Expand Down Expand Up @@ -3223,7 +3223,127 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
DELETE FROM `/Root/ColumnShard` WHERE Col2 = "not found";
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}*/
}
}

Y_UNIT_TEST_TWIN(TableSink_HtapComplex, withOltpSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink);
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();

const TString query = R"(
CREATE TABLE `/Root/ColumnSrc` (
Col1 Uint64 NOT NULL,
Col2 String NOT NULL,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
CREATE TABLE `/Root/RowSrc` (
Col1 Uint64 NOT NULL,
Col2 String NOT NULL,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
CREATE TABLE `/Root/ColumnDst` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32,
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
CREATE TABLE `/Root/RowDst` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32,
PRIMARY KEY (Col1)
)
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto client = kikimr.GetQueryClient();

{
auto result = client.ExecuteQuery(R"(
UPSERT INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES
(1u, "test1", 10), (2u, "test2", 11);
REPLACE INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES
(3u, "test3", 12), (4u, "test", 13);
UPSERT INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES
(10u, "test1", 10), (20u, "test2", 11);
REPLACE INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES
(30u, "test3", 12), (40u, "test", 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
$data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3
FROM `/Root/ColumnSrc`as c
JOIN `/Root/RowSrc` as r
ON c.Col1 + 10 = r.Col3;
UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data;
REPLACE INTO `/Root/RowDst` SELECT * FROM $data;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/ColumnDst`;
SELECT COUNT(*) FROM `/Root/RowDst`;
DELETE FROM `/Root/ColumnDst` WHERE 1=1;
DELETE FROM `/Root/RowDst` WHERE 1=1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(1)));
}

{
auto result = client.ExecuteQuery(R"(
$prepare = SELECT *
FROM `/Root/ColumnSrc`
WHERE Col2 LIKE '%test%test%';
$data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3
FROM `/Root/RowSrc`as c
LEFT OUTER JOIN $prepare as r
ON c.Col1 + 10 = r.Col3;
UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data;
REPLACE INTO `/Root/RowDst` SELECT * FROM $data;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/ColumnDst`;
SELECT COUNT(*) FROM `/Root/RowDst`;
DELETE FROM `/Root/ColumnDst` WHERE 1=1;
DELETE FROM `/Root/RowDst` WHERE 1=1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1)));
}
}

Y_UNIT_TEST_TWIN(TableSink_HtapInteractive, withOltpSink) {
Expand Down

0 comments on commit 6e3fc43

Please sign in to comment.