Skip to content

Commit

Permalink
Remove dependncy on <ydb/library/yql/dq/runtime/dq_arrow_helpers.h> (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Tony-Romanov authored Feb 1, 2024
1 parent 9cddf4b commit f2e86bd
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 23 deletions.
1 change: 0 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>

#include <ydb/library/yql/dq/runtime/dq_transport.h>
#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>

#include <ydb/library/actors/core/log.h>

Expand Down
37 changes: 16 additions & 21 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <ydb/core/tx/schemeshard/olap/schema/schema.h>

#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>
#include <ydb/library/yql/public/udf/arrow/block_builder.h>

#include <ydb/library/actors/core/log.h>

Expand Down Expand Up @@ -52,31 +52,26 @@ std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo&
const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
std::vector<std::shared_ptr<arrow::Field>> columns;
std::vector<std::shared_ptr<arrow::Array>> data;
auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames;

columns.reserve(parameterNames.size());
data.reserve(parameterNames.size());
if (const auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames; !parameterNames.empty()) {
columns.reserve(parameterNames.size());
data.reserve(parameterNames.size());

for (auto& name : stage.GetProgramParameters()) {
if (!parameterNames.contains(name)) {
continue;
}

auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow");

std::unique_ptr<arrow::ArrayBuilder> builder = NYql::NArrow::MakeArrowBuilder(type);
NYql::NArrow::AppendElement(value, builder.get(), type);

std::shared_ptr<arrow::Array> array;
auto status = builder->Finish(&array);
for (const auto& name : stage.GetProgramParameters()) {
if (!parameterNames.contains(name)) {
continue;
}

YQL_ENSURE(status.ok(), "Failed to build arrow array of variables.");
const auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
const auto builder = NUdf::MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), type, *arrow::default_memory_pool(), 1U, nullptr);
builder->Add(value);
const auto datum = builder->Build(true);

auto field = std::make_shared<arrow::Field>(name, array->type());
auto field = std::make_shared<arrow::Field>(name, datum.type());

columns.emplace_back(std::move(field));
data.emplace_back(std::move(array));
columns.emplace_back(std::move(field));
data.emplace_back(datum.make_array());
}
}

auto schema = std::make_shared<arrow::Schema>(std::move(columns));
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_tasks_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageIn
ui64 txId, bool enableSpilling);

NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task, bool serializeAsyncIoSettings);
void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, NYql::NDqProto::TDqTask* message, bool serializeAsyncIoSettings);
void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta);
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling);
Expand Down

0 comments on commit f2e86bd

Please sign in to comment.