Skip to content

Commit

Permalink
Merge 02d993e into 8372e70
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Oct 11, 2024
2 parents 8372e70 + 02d993e commit d084a79
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 18 deletions.
7 changes: 3 additions & 4 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,9 @@
"Match": {"Type": "Callable", "Name": "KqpTableSinkSettings"},
"Children": [
{"Index": 0, "Name": "Table", "Type": "TKqpTable"},
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 2, "Name": "InconsistentWrite", "Type": "TCoAtom"},
{"Index": 3, "Name": "Mode", "Type": "TCoAtom"},
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
{"Index": 1, "Name": "InconsistentWrite", "Type": "TCoAtom"},
{"Index": 2, "Name": "Mode", "Type": "TCoAtom"},
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
Expand Down
10 changes: 3 additions & 7 deletions ydb/core/kqp/opt/kqp_opt_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ TCoAtomList BuildKeyColumnsList(const TKikimrTableDescription& table, TPositionH
.Done();
}

TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns,
TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table,
const bool allowInconsistentWrites, const TStringBuf mode, TExprContext& ctx) {
Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr));

Expand All @@ -253,7 +253,6 @@ TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const
.Index().Value("0").Build()
.Settings<TKqpTableSinkSettings>()
.Table(table)
.Columns(columns)
.InconsistentWrite(allowInconsistentWrites
? ctx.NewAtom(expr.Pos(), "true")
: ctx.NewAtom(expr.Pos(), "false"))
Expand Down Expand Up @@ -311,7 +310,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
if (IsDqPureExpr(node.Input())) {
if (sinkEffect) {
stageInput = RebuildPureStageWithSink(
node.Input(), node.Table(), node.Columns(),
node.Input(), node.Table(),
settings.AllowInconsistentWrites, settings.Mode, ctx);
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
.Stage(stageInput.Cast().Ptr())
Expand Down Expand Up @@ -349,7 +348,6 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
.Index().Value("0").Build()
.Settings<TKqpTableSinkSettings>()
.Table(node.Table())
.Columns(node.Columns())
.InconsistentWrite(settings.AllowInconsistentWrites
? ctx.NewAtom(node.Pos(), "true")
: ctx.NewAtom(node.Pos(), "false"))
Expand Down Expand Up @@ -459,7 +457,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
if (IsDqPureExpr(node.Input())) {
if (sinkEffect) {
const auto keyColumns = BuildKeyColumnsList(table, node.Pos(), ctx);
stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), keyColumns, false, "delete", ctx);
stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), false, "delete", ctx);
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
.Stage(stageInput.Cast().Ptr())
.SinkIndex().Build("0")
Expand All @@ -486,7 +484,6 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
auto input = dqUnion.Output().Stage().Program().Body();

if (sinkEffect) {
const auto keyColumns = BuildKeyColumnsList(table, node.Pos(), ctx);
auto sink = Build<TDqSink>(ctx, node.Pos())
.DataSink<TKqpTableSink>()
.Category(ctx.NewAtom(node.Pos(), NYql::KqpTableSinkName))
Expand All @@ -495,7 +492,6 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
.Index().Value("0").Build()
.Settings<TKqpTableSinkSettings>()
.Table(node.Table())
.Columns(keyColumns)
.InconsistentWrite(ctx.NewAtom(node.Pos(), "false"))
.Mode(ctx.NewAtom(node.Pos(), "delete"))
.Settings()
Expand Down
38 changes: 31 additions & 7 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ void FillTablesMap(const TKqpTable& table, const TCoAtomList& columns,
}
}

void FillTablesMap(const TKqpTable& table, const TVector<TString>& columns,
THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
{
FillTablesMap(table, tablesMap);

for (const auto& column : columns) {
tablesMap[table.Path()].emplace(column);
}
}

void FillTable(const TKikimrTableMetadata& tableMeta, THashSet<TStringBuf>&& columns,
NKqpProto::TKqpPhyTable& tableProto)
{
Expand Down Expand Up @@ -808,7 +818,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
YQL_ENSURE(maybeSinkNode);
auto sinkNode = maybeSinkNode.Cast();
auto* sinkProto = stageProto.AddSinks();
FillSink(sinkNode, sinkProto, tablesMap, ctx);
FillSink(sinkNode, sinkProto, tablesMap, stage, ctx);
sinkProto->SetOutputIndex(FromString(TStringBuf(sinkNode.Index())));

if (IsTableSink(sinkNode.DataSink().Cast<TCoDataSink>().Category())) {
Expand Down Expand Up @@ -1074,12 +1084,27 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
}
}

void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) {
void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, const TDqPhyStage& stage) {
if (auto settings = sink.Settings().Maybe<TKqpTableSinkSettings>()) {
NKqpProto::TKqpInternalSink& internalSinkProto = *protoSink->MutableInternalSink();
internalSinkProto.SetType(TString(NYql::KqpTableSinkName));
NKikimrKqp::TKqpTableSinkSettings settingsProto;
FillTablesMap(settings.Table().Cast(), settings.Columns().Cast(), tablesMap);

const auto& tupleType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
YQL_ENSURE(tupleType);
YQL_ENSURE(tupleType->GetSize() == 1);
const auto& listType = tupleType->GetItems()[0]->Cast<TListExprType>();
YQL_ENSURE(listType);
const auto& structType = listType->GetItemType()->Cast<TStructExprType>();
YQL_ENSURE(structType);

TVector<TString> columns;
columns.reserve(structType->GetSize());
for (const auto& item : structType->GetItems()) {
columns.emplace_back(item->GetName());
}

FillTablesMap(settings.Table().Cast(), columns, tablesMap);
FillTableId(settings.Table().Cast(), *settingsProto.MutableTable());

const auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata;
Expand All @@ -1102,8 +1127,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
fillColumnProto(columnName, columnMeta, keyColumnProto);
}

for (const auto& column : settings.Columns().Cast()) {
const auto columnName = column.StringValue();
for (const auto& columnName : columns) {
const auto columnMeta = tableMeta->Columns.FindPtr(columnName);
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\"");

Expand Down Expand Up @@ -1141,11 +1165,11 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
|| dataSinkCategory == NYql::KqpTableSinkName;
}

void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, TExprContext& ctx) {
void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, const TDqPhyStage& stage, TExprContext& ctx) {
Y_UNUSED(ctx);
const TStringBuf dataSinkCategory = sink.DataSink().Cast<TCoDataSink>().Category();
if (IsTableSink(dataSinkCategory)) {
FillKqpSink(sink, protoSink, tablesMap);
FillKqpSink(sink, protoSink, tablesMap, stage);
} else {
// Delegate sink filling to dq integration of specific provider
const auto provider = TypesCtx.DataSinkMap.find(dataSinkCategory);
Expand Down
48 changes: 48 additions & 0 deletions ydb/core/kqp/ut/olap/delete_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include <ydb/core/kqp/ut/common/columnshard.h>

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NKqp {
Y_UNIT_TEST_SUITE(KqpOlapDelete) {
Y_UNIT_TEST_TWIN(DeleteWithDiffrentTypesPKColumns, isStream) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
auto runnerSettings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(true);

TTestHelper testHelper(runnerSettings);
auto client = testHelper.GetKikimr().GetQueryClient();

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("time").SetType(NScheme::NTypeIds::Timestamp).SetNullable(false),
TTestHelper::TColumnSchema().SetName("class").SetType(NScheme::NTypeIds::Utf8).SetNullable(false),
TTestHelper::TColumnSchema().SetName("uniq").SetType(NScheme::NTypeIds::Utf8).SetNullable(false),
};

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "time", "class", "uniq" }).SetSchema(schema);
testHelper.CreateTable(testTable);

auto ts = TInstant::Now();
{
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(ts.MicroSeconds()).Add("test").Add("test");
testHelper.BulkUpsert(testTable, tableInserter);
}


if (isStream) {
auto deleteQuery = "DELETE FROM `/Root/ColumnTableTest` ON SELECT * FROM `/Root/ColumnTableTest`";
auto deleteQueryResult = client.ExecuteQuery(deleteQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(deleteQueryResult.IsSuccess(), deleteQueryResult.GetIssues().ToString());
} else {
auto deleteQuery = TStringBuilder() << "DELETE FROM `/Root/ColumnTableTest` WHERE Cast(DateTime::MakeDate(DateTime::StartOfDay(time)) as String) == \""
<< ts.FormatLocalTime("%Y-%m-%d")
<< "\" and class == \"test\" and uniq = \"test\";";
auto deleteQueryResult = client.ExecuteQuery(deleteQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(deleteQueryResult.IsSuccess(), deleteQueryResult.GetIssues().ToString());
}

testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest`", "[]");
}
}
}
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ELSE()
ENDIF()

SRCS(
delete_ut.cpp
kqp_olap_stats_ut.cpp
GLOBAL kqp_olap_ut.cpp
sys_view_ut.cpp
Expand Down

0 comments on commit d084a79

Please sign in to comment.