Skip to content

Commit

Permalink
Improve WriteActor (#12167)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Dec 3, 2024
1 parent e7cb027 commit 9a920b4
Show file tree
Hide file tree
Showing 6 changed files with 532 additions and 442 deletions.
6 changes: 5 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
YQL_ENSURE(stage.SinksSize() == 1);
meta.TableId = MakeTableId(settings.GetTable());
meta.TablePath = settings.GetTable().GetPath();
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Update);
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_DELETE) {
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Erase);
} else {
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Update);
}
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
}
}
Expand Down
34 changes: 34 additions & 0 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,32 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
}
}

THashMap<TStringBuf, ui32> CreateColumnToOrder(
const TVector<TStringBuf>& columns,
const TKikimrTableMetadataPtr& tableMeta,
bool keysFirst) {
THashSet<TStringBuf> usedColumns;
for (const auto& columnName : columns) {
usedColumns.insert(columnName);
}

THashMap<TStringBuf, ui32> columnToOrder;
ui32 number = 0;
if (keysFirst) {
for (const auto& columnName : tableMeta->KeyColumnNames) {
YQL_ENSURE(usedColumns.contains(columnName));
columnToOrder[columnName] = number++;
}
}
for (const auto& columnName : tableMeta->ColumnOrder) {
if (usedColumns.contains(columnName) && !columnToOrder.contains(columnName)) {
columnToOrder[columnName] = number++;
}
}

return columnToOrder;
}

void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, const TDqPhyStage& stage) {
if (auto settings = sink.Settings().Maybe<TKqpTableSinkSettings>()) {
NKqpProto::TKqpInternalSink& internalSinkProto = *protoSink->MutableInternalSink();
Expand Down Expand Up @@ -1165,6 +1191,14 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
fillColumnProto(columnName, columnMeta, columnProto);
}

const auto columnToOrder = CreateColumnToOrder(
columns,
tableMeta,
settings.TableType().Cast().StringValue() == "oltp");
for (const auto& columnName : columns) {
settingsProto.AddWriteIndexes(columnToOrder.at(columnName));
}

if (const auto inconsistentWrite = settings.InconsistentWrite().Cast(); inconsistentWrite.StringValue() == "true") {
settingsProto.SetInconsistentTx(true);
}
Expand Down
Loading

0 comments on commit 9a920b4

Please sign in to comment.