diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 532ed349b4e3..63ec4a7dc929 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -546,10 +546,9 @@ "Match": {"Type": "Callable", "Name": "KqpTableSinkSettings"}, "Children": [ {"Index": 0, "Name": "Table", "Type": "TKqpTable"}, - {"Index": 1, "Name": "Columns", "Type": "TCoAtomList"}, - {"Index": 2, "Name": "InconsistentWrite", "Type": "TCoAtom"}, - {"Index": 3, "Name": "Mode", "Type": "TCoAtom"}, - {"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + {"Index": 1, "Name": "InconsistentWrite", "Type": "TCoAtom"}, + {"Index": 2, "Name": "Mode", "Type": "TCoAtom"}, + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} ] }, { diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp index d0cb2fb36211..0c0e818d6853 100644 --- a/ydb/core/kqp/opt/kqp_opt_effects.cpp +++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp @@ -231,7 +231,7 @@ TCoAtomList BuildKeyColumnsList(const TKikimrTableDescription& table, TPositionH .Done(); } -TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns, +TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const bool allowInconsistentWrites, const TStringBuf mode, TExprContext& ctx) { Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr)); @@ -253,7 +253,6 @@ TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const .Index().Value("0").Build() .Settings() .Table(table) - .Columns(columns) .InconsistentWrite(allowInconsistentWrites ? ctx.NewAtom(expr.Pos(), "true") : ctx.NewAtom(expr.Pos(), "false")) @@ -311,7 +310,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const if (IsDqPureExpr(node.Input())) { if (sinkEffect) { stageInput = RebuildPureStageWithSink( - node.Input(), node.Table(), node.Columns(), + node.Input(), node.Table(), settings.AllowInconsistentWrites, settings.Mode, ctx); effect = Build(ctx, node.Pos()) .Stage(stageInput.Cast().Ptr()) @@ -349,7 +348,6 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const .Index().Value("0").Build() .Settings() .Table(node.Table()) - .Columns(node.Columns()) .InconsistentWrite(settings.AllowInconsistentWrites ? ctx.NewAtom(node.Pos(), "true") : ctx.NewAtom(node.Pos(), "false")) @@ -459,7 +457,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const if (IsDqPureExpr(node.Input())) { if (sinkEffect) { const auto keyColumns = BuildKeyColumnsList(table, node.Pos(), ctx); - stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), keyColumns, false, "delete", ctx); + stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), false, "delete", ctx); effect = Build(ctx, node.Pos()) .Stage(stageInput.Cast().Ptr()) .SinkIndex().Build("0") @@ -486,7 +484,6 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const auto input = dqUnion.Output().Stage().Program().Body(); if (sinkEffect) { - const auto keyColumns = BuildKeyColumnsList(table, node.Pos(), ctx); auto sink = Build(ctx, node.Pos()) .DataSink() .Category(ctx.NewAtom(node.Pos(), NYql::KqpTableSinkName)) @@ -495,7 +492,6 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const .Index().Value("0").Build() .Settings() .Table(node.Table()) - .Columns(keyColumns) .InconsistentWrite(ctx.NewAtom(node.Pos(), "false")) .Mode(ctx.NewAtom(node.Pos(), "delete")) .Settings() diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index e438a1a1017b..209f61846388 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -149,6 +149,16 @@ void FillTablesMap(const TKqpTable& table, const TCoAtomList& columns, } } +void FillTablesMap(const TKqpTable& table, const TVector& columns, + THashMap>& tablesMap) +{ + FillTablesMap(table, tablesMap); + + for (const auto& column : columns) { + tablesMap[table.Path()].emplace(column); + } +} + void FillTable(const TKikimrTableMetadata& tableMeta, THashSet&& columns, NKqpProto::TKqpPhyTable& tableProto) { @@ -808,7 +818,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { YQL_ENSURE(maybeSinkNode); auto sinkNode = maybeSinkNode.Cast(); auto* sinkProto = stageProto.AddSinks(); - FillSink(sinkNode, sinkProto, tablesMap, ctx); + FillSink(sinkNode, sinkProto, tablesMap, stage, ctx); sinkProto->SetOutputIndex(FromString(TStringBuf(sinkNode.Index()))); if (IsTableSink(sinkNode.DataSink().Cast().Category())) { @@ -1074,19 +1084,34 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { } } - void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap>& tablesMap) { + void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap>& tablesMap, const TDqPhyStage& stage) { if (auto settings = sink.Settings().Maybe()) { NKqpProto::TKqpInternalSink& internalSinkProto = *protoSink->MutableInternalSink(); internalSinkProto.SetType(TString(NYql::KqpTableSinkName)); NKikimrKqp::TKqpTableSinkSettings settingsProto; - FillTablesMap(settings.Table().Cast(), settings.Columns().Cast(), tablesMap); + + const auto& tupleType = stage.Ref().GetTypeAnn()->Cast(); + YQL_ENSURE(tupleType); + YQL_ENSURE(tupleType->GetSize() == 1); + const auto& listType = tupleType->GetItems()[0]->Cast(); + YQL_ENSURE(listType); + const auto& structType = listType->GetItemType()->Cast(); + YQL_ENSURE(structType); + + TVector columns; + columns.reserve(structType->GetSize()); + for (const auto& item : structType->GetItems()) { + columns.emplace_back(item->GetName()); + } + + FillTablesMap(settings.Table().Cast(), columns, tablesMap); FillTableId(settings.Table().Cast(), *settingsProto.MutableTable()); const auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata; - auto fillColumnProto = [] (TString columnName, const NYql::TKikimrColumnMetadata* column, NKikimrKqp::TKqpColumnMetadataProto* columnProto ) { + auto fillColumnProto = [] (TStringBuf columnName, const NYql::TKikimrColumnMetadata* column, NKikimrKqp::TKqpColumnMetadataProto* columnProto ) { columnProto->SetId(column->Id); - columnProto->SetName(columnName); + columnProto->SetName(TString(columnName)); columnProto->SetTypeId(column->TypeInfo.GetTypeId()); if(NScheme::NTypeIds::IsParametrizedType(column->TypeInfo.GetTypeId())) { @@ -1096,16 +1121,15 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { for (const auto& columnName : tableMeta->KeyColumnNames) { const auto columnMeta = tableMeta->Columns.FindPtr(columnName); - YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\""); + YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + TString(columnName) + "\""); auto keyColumnProto = settingsProto.AddKeyColumns(); fillColumnProto(columnName, columnMeta, keyColumnProto); } - for (const auto& column : settings.Columns().Cast()) { - const auto columnName = column.StringValue(); + for (const auto& columnName : columns) { const auto columnMeta = tableMeta->Columns.FindPtr(columnName); - YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\""); + YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + TString(columnName) + "\""); auto columnProto = settingsProto.AddColumns(); fillColumnProto(columnName, columnMeta, columnProto); @@ -1141,11 +1165,11 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { || dataSinkCategory == NYql::KqpTableSinkName; } - void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap>& tablesMap, TExprContext& ctx) { + void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap>& tablesMap, const TDqPhyStage& stage, TExprContext& ctx) { Y_UNUSED(ctx); const TStringBuf dataSinkCategory = sink.DataSink().Cast().Category(); if (IsTableSink(dataSinkCategory)) { - FillKqpSink(sink, protoSink, tablesMap); + FillKqpSink(sink, protoSink, tablesMap, stage); } else { // Delegate sink filling to dq integration of specific provider const auto provider = TypesCtx.DataSinkMap.find(dataSinkCategory); diff --git a/ydb/core/kqp/ut/olap/delete_ut.cpp b/ydb/core/kqp/ut/olap/delete_ut.cpp new file mode 100644 index 000000000000..cf1dd2999914 --- /dev/null +++ b/ydb/core/kqp/ut/olap/delete_ut.cpp @@ -0,0 +1,48 @@ +#include + +#include + +namespace NKikimr::NKqp { +Y_UNIT_TEST_SUITE(KqpOlapDelete) { + Y_UNIT_TEST_TWIN(DeleteWithDiffrentTypesPKColumns, isStream) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + auto runnerSettings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(true); + + TTestHelper testHelper(runnerSettings); + auto client = testHelper.GetKikimr().GetQueryClient(); + + TVector schema = { + TTestHelper::TColumnSchema().SetName("time").SetType(NScheme::NTypeIds::Timestamp).SetNullable(false), + TTestHelper::TColumnSchema().SetName("class").SetType(NScheme::NTypeIds::Utf8).SetNullable(false), + TTestHelper::TColumnSchema().SetName("uniq").SetType(NScheme::NTypeIds::Utf8).SetNullable(false), + }; + + TTestHelper::TColumnTable testTable; + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "time", "class", "uniq" }).SetSchema(schema); + testHelper.CreateTable(testTable); + + auto ts = TInstant::Now(); + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(ts.MicroSeconds()).Add("test").Add("test"); + testHelper.BulkUpsert(testTable, tableInserter); + } + + + if (isStream) { + auto deleteQuery = "DELETE FROM `/Root/ColumnTableTest` ON SELECT * FROM `/Root/ColumnTableTest`"; + auto deleteQueryResult = client.ExecuteQuery(deleteQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(deleteQueryResult.IsSuccess(), deleteQueryResult.GetIssues().ToString()); + } else { + auto deleteQuery = TStringBuilder() << "DELETE FROM `/Root/ColumnTableTest` WHERE Cast(DateTime::MakeDate(DateTime::StartOfDay(time)) as String) == \"" + << ts.FormatLocalTime("%Y-%m-%d") + << "\" and class == \"test\" and uniq = \"test\";"; + auto deleteQueryResult = client.ExecuteQuery(deleteQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(deleteQueryResult.IsSuccess(), deleteQueryResult.GetIssues().ToString()); + } + + testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest`", "[]"); + } +} +} \ No newline at end of file diff --git a/ydb/core/kqp/ut/olap/ya.make b/ydb/core/kqp/ut/olap/ya.make index ca9b32852601..e7b79479f08a 100644 --- a/ydb/core/kqp/ut/olap/ya.make +++ b/ydb/core/kqp/ut/olap/ya.make @@ -13,6 +13,7 @@ ELSE() ENDIF() SRCS( + delete_ut.cpp kqp_olap_stats_ut.cpp GLOBAL kqp_olap_ut.cpp sys_view_ut.cpp diff --git a/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp index 4959cf240e05..bc4f31996137 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp @@ -144,21 +144,11 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) { result = session.ExecuteQuery(Q_(R"( UPDATE `/Root/KV` SET Value = "third" WHERE Key = 4; )"), TTxControl::Tx(tx->GetId())).ExtractValueSync(); - if (GetIsOlap()) { - // Olap has Reads in this query, so it breaks now. - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); - } else { - // Oltp doesn't have Reads in this query, so it breaks later. - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); auto commitResult = tx->Commit().ExtractValueSync(); - if (GetIsOlap()) { - UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString()); - } else { - UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString()); - } + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString()); } };