Skip to content

Commit

Permalink
YQL-18053: Add block implementation for AsStruct callable (ydb-platfo…
Browse files Browse the repository at this point in the history
  • Loading branch information
igormunkin authored Apr 22, 2024
1 parent b56212c commit f1bf06c
Show file tree
Hide file tree
Showing 60 changed files with 448 additions and 334 deletions.
25 changes: 24 additions & 1 deletion ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5570,7 +5570,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
std::string_view arrowFunctionName;
const bool rewriteAsIs = node->IsCallable({"AssumeStrict", "AssumeNonStrict", "Likely"});
if (node->IsList() || rewriteAsIs ||
node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "Exists", "If", "Just", "Member", "Nth", "ToPg", "FromPg", "PgResolvedCall", "PgResolvedOp"}))
node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "Exists", "If", "Just", "AsStruct", "Member", "Nth", "ToPg", "FromPg", "PgResolvedCall", "PgResolvedOp"}))
{
if (node->IsCallable() && !IsSupportedAsBlockType(node->Pos(), *node->GetTypeAnn(), ctx, types)) {
return true;
Expand Down Expand Up @@ -5609,6 +5609,29 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
}
}

// <AsStruct> arguments (i.e. members of the resulting structure)
// are literal tuples, that don't propagate their child rewrites.
// Hence, process these rewrites the following way: wrap the
// complete expressions, supported by the block engine, with
// <AsScalar> callable or apply the rewrite of one is found.
// Otherwise, abort this <AsStruct> rewrite, since one of its
// arguments is neither block nor scalar.
if (node->IsCallable("AsStruct")) {
for (ui32 index = 0; index < node->ChildrenSize(); index++) {
auto member = funcArgs[index];
auto child = member->TailPtr();
TExprNodePtr rewrite;
if (child->IsComplete() && IsSupportedAsBlockType(child->Pos(), *child->GetTypeAnn(), ctx, types)) {
rewrite = ctx.NewCallable(child->Pos(), "AsScalar", { child });
} else if (auto rit = rewrites.find(child.Get()); rit != rewrites.end()) {
rewrite = rit->second;
} else {
return true;
}
funcArgs[index] = ctx.NewList(member->Pos(), {member->HeadPtr(), rewrite});
}
}

const TString blockFuncName = rewriteAsIs ? ToString(node->Content()) :
(TString("Block") + (node->IsList() ? "AsTuple" : node->Content()));
if (node->IsCallable({"And", "Or", "Xor"}) && funcArgs.size() > 2) {
Expand Down
51 changes: 51 additions & 0 deletions ydb/library/yql/core/type_ann/type_ann_blocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,57 @@ IGraphTransformer::TStatus BlockJustWrapper(const TExprNode::TPtr& input, TExprN
return IGraphTransformer::TStatus::Ok;
}

IGraphTransformer::TStatus BlockAsStructWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

TVector<const TItemExprType*> members;
bool onlyScalars = true;
for (auto& child : input->Children()) {
auto nameNode = child->Child(0);
if (!EnsureAtom(*nameNode, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
auto valueNode = child->Child(1);
if (!EnsureBlockOrScalarType(*valueNode, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

bool isScalar;
const TTypeAnnotationNode* blockItemType = GetBlockItemType(*valueNode->GetTypeAnn(), isScalar);

onlyScalars = onlyScalars && isScalar;
members.push_back(ctx.Expr.MakeType<TItemExprType>(nameNode->Content(), blockItemType));
}

auto structType = ctx.Expr.MakeType<TStructExprType>(members);
if (!structType->Validate(input->Pos(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

auto less = [](const TExprNode::TPtr& left, const TExprNode::TPtr& right) {
return left->Head().Content() < right->Head().Content();
};

if (!IsSorted(input->Children().begin(), input->Children().end(), less)) {
auto list = input->ChildrenList();
Sort(list.begin(), list.end(), less);
output = ctx.Expr.ChangeChildren(*input, std::move(list));
return IGraphTransformer::TStatus::Repeat;
}

const TTypeAnnotationNode* resultType;
if (onlyScalars) {
resultType = ctx.Expr.MakeType<TScalarExprType>(structType);
} else {
resultType = ctx.Expr.MakeType<TBlockExprType>(structType);
}
input->SetTypeAnn(resultType);
return IGraphTransformer::TStatus::Ok;
}

IGraphTransformer::TStatus BlockAsTupleWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/type_ann/type_ann_blocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus BlockLogicalWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockIfWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockJustWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockAsStructWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockAsTupleWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockNthWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockMemberWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/type_ann/type_ann_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12286,6 +12286,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["BlockNot"] = &BlockLogicalWrapper;
Functions["BlockIf"] = &BlockIfWrapper;
Functions["BlockJust"] = &BlockJustWrapper;
Functions["BlockAsStruct"] = &BlockAsStructWrapper;
Functions["BlockAsTuple"] = &BlockAsTupleWrapper;
Functions["BlockMember"] = &BlockMemberWrapper;
Functions["BlockNth"] = &BlockNthWrapper;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/yql_expr_constraint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
Functions["Limit"] = &TCallableConstraintTransformer::TakeWrap;
Functions["Member"] = &TCallableConstraintTransformer::MemberWrap;
Functions["AsStruct"] = &TCallableConstraintTransformer::AsStructWrap;
Functions["BlockAsStruct"] = &TCallableConstraintTransformer::AsStructWrap;
Functions["Just"] = &TCallableConstraintTransformer::FromFirst<TPassthroughConstraintNode, TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode, TPartOfSortedConstraintNode, TPartOfChoppedConstraintNode, TVarIndexConstraintNode, TMultiConstraintNode>;
Functions["Unwrap"] = &TCallableConstraintTransformer::FromFirst<TPassthroughConstraintNode, TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode, TPartOfSortedConstraintNode, TPartOfChoppedConstraintNode, TVarIndexConstraintNode, TMultiConstraintNode>;
Functions["Ensure"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "mkql_block_tuple.h"
#include "mkql_block_container.h"

#include <ydb/library/yql/minikql/computation/mkql_block_impl.h>

Expand All @@ -15,9 +15,9 @@ namespace NMiniKQL {

namespace {

class TBlockAsTupleExec {
class TBlockAsContainerExec {
public:
TBlockAsTupleExec(const TVector<TType*>& argTypes, const std::shared_ptr<arrow::DataType>& returnArrowType)
TBlockAsContainerExec(const TVector<TType*>& argTypes, const std::shared_ptr<arrow::DataType>& returnArrowType)
: ArgTypes(argTypes)
, ReturnArrowType(returnArrowType)
{}
Expand Down Expand Up @@ -66,10 +66,10 @@ class TBlockAsTupleExec {
const std::shared_ptr<arrow::DataType> ReturnArrowType;
};

std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockAsTupleKernel(const TVector<TType*>& argTypes, TType* resultType) {
std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockAsContainerKernel(const TVector<TType*>& argTypes, TType* resultType) {
std::shared_ptr<arrow::DataType> returnArrowType;
MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type");
auto exec = std::make_shared<TBlockAsTupleExec>(argTypes, returnArrowType);
auto exec = std::make_shared<TBlockAsContainerExec>(argTypes, returnArrowType);
auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
[exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
return exec->Exec(ctx, batch, res);
Expand All @@ -81,17 +81,17 @@ std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockAsTupleKernel(const TVect

} // namespace

IComputationNode* WrapBlockAsTuple(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
IComputationNode* WrapBlockAsContainer(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
TComputationNodePtrVector argsNodes;
TVector<TType*> argsTypes;
for (ui32 i = 0; i < callable.GetInputsCount(); ++i) {
argsNodes.push_back(LocateNode(ctx.NodeLocator, callable, i));
argsTypes.push_back(callable.GetInput(i).GetStaticType());
}

auto kernel = MakeBlockAsTupleKernel(argsTypes, callable.GetType()->GetReturnType());
auto kernel = MakeBlockAsContainerKernel(argsTypes, callable.GetType()->GetReturnType());
return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel);
}

}
}
} // namespace NMiniKQL
} // namespace NKikimr
10 changes: 10 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_container.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>

namespace NKikimr {
namespace NMiniKQL {

IComputationNode* WrapBlockAsContainer(TCallable& callable, const TComputationNodeFactoryContext& ctx);

} // namespace NMiniKQL
} // namespace NKikimr
10 changes: 0 additions & 10 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_tuple.h

This file was deleted.

5 changes: 3 additions & 2 deletions ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "mkql_blocks.h"
#include "mkql_block_agg.h"
#include "mkql_block_coalesce.h"
#include "mkql_block_container.h"
#include "mkql_block_exists.h"
#include "mkql_block_getelem.h"
#include "mkql_block_if.h"
Expand All @@ -16,7 +17,6 @@
#include "mkql_block_compress.h"
#include "mkql_block_skiptake.h"
#include "mkql_block_top.h"
#include "mkql_block_tuple.h"
#include "mkql_callable.h"
#include "mkql_chain_map.h"
#include "mkql_chain1_map.h"
Expand Down Expand Up @@ -297,7 +297,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"BlockNot", &WrapBlockNot},
{"BlockJust", &WrapBlockJust},
{"BlockCompress", &WrapBlockCompress},
{"BlockAsTuple", &WrapBlockAsTuple},
{"BlockAsTuple", &WrapBlockAsContainer},
{"BlockAsStruct", &WrapBlockAsContainer},
{"BlockMember", &WrapBlockMember},
{"BlockNth", &WrapBlockNth},
{"BlockExpandChunked", &WrapBlockExpandChunked},
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/minikql/comp_nodes/ya.make.inc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ SET(ORIG_SOURCES
mkql_block_agg_some.cpp
mkql_block_agg_sum.cpp
mkql_block_coalesce.cpp
mkql_block_container.cpp
mkql_block_exists.cpp
mkql_block_getelem.cpp
mkql_block_if.cpp
Expand All @@ -24,7 +25,6 @@ SET(ORIG_SOURCES
mkql_block_func.cpp
mkql_block_skiptake.cpp
mkql_block_top.cpp
mkql_block_tuple.cpp
mkql_blocks.cpp
mkql_callable.cpp
mkql_chain_map.cpp
Expand Down
22 changes: 22 additions & 0 deletions ydb/library/yql/minikql/mkql_program_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,28 @@ TRuntimeNode TProgramBuilder::BlockNth(TRuntimeNode tuple, ui32 index) {
return TRuntimeNode(callableBuilder.Build(), false);
}

TRuntimeNode TProgramBuilder::BlockAsStruct(const TArrayRef<std::pair<std::string_view, TRuntimeNode>>& args) {
MKQL_ENSURE(!args.empty(), "Expected at least one argument");

TBlockType::EShape resultShape = TBlockType::EShape::Scalar;
TVector<std::pair<std::string_view, TType*>> members;
for (const auto& x : args) {
auto blockType = AS_TYPE(TBlockType, x.second.GetStaticType());
members.emplace_back(x.first, blockType->GetItemType());
if (blockType->GetShape() == TBlockType::EShape::Many) {
resultShape = TBlockType::EShape::Many;
}
}

auto returnType = NewBlockType(NewStructType(members), resultShape);
TCallableBuilder callableBuilder(Env, __func__, returnType);
for (const auto& x : args) {
callableBuilder.Add(x.second);
}

return TRuntimeNode(callableBuilder.Build(), false);
}

TRuntimeNode TProgramBuilder::BlockAsTuple(const TArrayRef<const TRuntimeNode>& args) {
MKQL_ENSURE(!args.empty(), "Expected at least one argument");

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/minikql/mkql_program_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class TProgramBuilder : public TTypeBuilder {
TRuntimeNode BlockExists(TRuntimeNode data);
TRuntimeNode BlockMember(TRuntimeNode structure, const std::string_view& memberName);
TRuntimeNode BlockNth(TRuntimeNode tuple, ui32 index);
TRuntimeNode BlockAsStruct(const TArrayRef<std::pair<std::string_view, TRuntimeNode>>& args);
TRuntimeNode BlockAsTuple(const TArrayRef<const TRuntimeNode>& args);
TRuntimeNode BlockToPg(TRuntimeNode input, TType* returnType);
TRuntimeNode BlockFromPg(TRuntimeNode input, TType* returnType);
Expand Down
8 changes: 8 additions & 0 deletions ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2737,6 +2737,14 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.BlockNth(tupleObj, index);
});

AddCallable("BlockAsStruct", [](const TExprNode& node, TMkqlBuildContext& ctx) {
std::vector<std::pair<std::string_view, TRuntimeNode>> members;
for (const auto& x : node.Children()) {
members.emplace_back(x->Head().Content(), MkqlBuildExpr(x->Tail(), ctx));
}
return ctx.ProgramBuilder.BlockAsStruct(members);
});

AddCallable("BlockAsTuple", [](const TExprNode& node, TMkqlBuildContext& ctx) {
TVector<TRuntimeNode> args;
for (const auto& x : node.Children()) {
Expand Down
24 changes: 12 additions & 12 deletions ydb/library/yql/tests/sql/dq_file/part11/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -535,23 +535,23 @@
"test.test[blocks-filter_direct_col--Results]": [],
"test.test[blocks-member--Analyze]": [
{
"checksum": "b08274fd137c1878d90520c832f06fd3",
"size": 3676,
"uri": "https://{canondata_backend}/1889210/75a1d72834c0a9de8b328ec130be934f6cc6cea0/resource.tar.gz#test.test_blocks-member--Analyze_/plan.txt"
"checksum": "4d80733bb5655340645b981057ba9910",
"size": 3703,
"uri": "https://{canondata_backend}/1942525/5635585a917e888e1628404d5ff137a2e18f23ca/resource.tar.gz#test.test_blocks-member--Analyze_/plan.txt"
}
],
"test.test[blocks-member--Debug]": [
{
"checksum": "a30b76ba380ee4f694dc731aa2771fed",
"size": 1467,
"uri": "https://{canondata_backend}/1937027/96028d31f8e29253c9276a86f02284e2a71add76/resource.tar.gz#test.test_blocks-member--Debug_/opt.yql_patched"
"checksum": "be5b35d7624905acc850940a1ff74780",
"size": 1837,
"uri": "https://{canondata_backend}/1775319/6624c18402d2e5473f3dcf5d9248a5e624496fd5/resource.tar.gz#test.test_blocks-member--Debug_/opt.yql_patched"
}
],
"test.test[blocks-member--Plan]": [
{
"checksum": "b08274fd137c1878d90520c832f06fd3",
"size": 3676,
"uri": "https://{canondata_backend}/1889210/75a1d72834c0a9de8b328ec130be934f6cc6cea0/resource.tar.gz#test.test_blocks-member--Plan_/plan.txt"
"checksum": "4d80733bb5655340645b981057ba9910",
"size": 3703,
"uri": "https://{canondata_backend}/1942525/5635585a917e888e1628404d5ff137a2e18f23ca/resource.tar.gz#test.test_blocks-member--Plan_/plan.txt"
}
],
"test.test[blocks-member--Results]": [],
Expand Down Expand Up @@ -586,9 +586,9 @@
],
"test.test[blocks-sort_two_mix--Debug]": [
{
"checksum": "60076d20175ed4eb32b22f7be43cb490",
"size": 1915,
"uri": "https://{canondata_backend}/1936947/a99026e839b7e22714c2a9a81971a3b5e3ed1eb4/resource.tar.gz#test.test_blocks-sort_two_mix--Debug_/opt.yql_patched"
"checksum": "8324668b9f66ec5f9fc3e70217a057e9",
"size": 2006,
"uri": "https://{canondata_backend}/1937429/5efa179cb9a9173602a23e7c0e313970073e2969/resource.tar.gz#test.test_blocks-sort_two_mix--Debug_/opt.yql_patched"
}
],
"test.test[blocks-sort_two_mix--Plan]": [
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/yql/tests/sql/dq_file/part13/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,9 @@
],
"test.test[blocks-sort_one_desc--Debug]": [
{
"checksum": "3ff65a165f6fd32950d8b56ba98d95b4",
"size": 1767,
"uri": "https://{canondata_backend}/1936997/93899b3de50fae3f9677baacc98094a7a629590a/resource.tar.gz#test.test_blocks-sort_one_desc--Debug_/opt.yql_patched"
"checksum": "7d4eab41033eb90eb99af4870531b0d6",
"size": 1827,
"uri": "https://{canondata_backend}/1937429/1a0fd6a532256a80615a0e2f24e1f1ec999cb7ef/resource.tar.gz#test.test_blocks-sort_one_desc--Debug_/opt.yql_patched"
}
],
"test.test[blocks-sort_one_desc--Plan]": [
Expand All @@ -552,9 +552,9 @@
],
"test.test[blocks-top_sort_one_desc--Debug]": [
{
"checksum": "be5f2a8fbecfbe65f97c1ba40a556497",
"size": 1806,
"uri": "https://{canondata_backend}/1936997/93899b3de50fae3f9677baacc98094a7a629590a/resource.tar.gz#test.test_blocks-top_sort_one_desc--Debug_/opt.yql_patched"
"checksum": "c8bfd2fd80dc1bf818f5f61d99ff8ba9",
"size": 1866,
"uri": "https://{canondata_backend}/1937429/1a0fd6a532256a80615a0e2f24e1f1ec999cb7ef/resource.tar.gz#test.test_blocks-top_sort_one_desc--Debug_/opt.yql_patched"
}
],
"test.test[blocks-top_sort_one_desc--Plan]": [
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -680,9 +680,9 @@
],
"test.test[blocks-sort_two_desc--Debug]": [
{
"checksum": "46f3a87a89918a4c8e91293f82f6f639",
"size": 1923,
"uri": "https://{canondata_backend}/1600758/aad142702907f13e911494c1a7b312bad34f692a/resource.tar.gz#test.test_blocks-sort_two_desc--Debug_/opt.yql_patched"
"checksum": "fdc217d5f0a13a7f060e53c0b26bf955",
"size": 2007,
"uri": "https://{canondata_backend}/1936273/dc087a913c2a638b764a20e1066eb33c1b05f57b/resource.tar.gz#test.test_blocks-sort_two_desc--Debug_/opt.yql_patched"
}
],
"test.test[blocks-sort_two_desc--Plan]": [
Expand All @@ -702,9 +702,9 @@
],
"test.test[blocks-top_sort_two_desc--Debug]": [
{
"checksum": "0e61c46d05ae7985462f918f60bab5d5",
"size": 1962,
"uri": "https://{canondata_backend}/1600758/aad142702907f13e911494c1a7b312bad34f692a/resource.tar.gz#test.test_blocks-top_sort_two_desc--Debug_/opt.yql_patched"
"checksum": "00c63032e96f06a8b468d46263b7087c",
"size": 2046,
"uri": "https://{canondata_backend}/1936273/dc087a913c2a638b764a20e1066eb33c1b05f57b/resource.tar.gz#test.test_blocks-top_sort_two_desc--Debug_/opt.yql_patched"
}
],
"test.test[blocks-top_sort_two_desc--Plan]": [
Expand Down
Loading

0 comments on commit f1bf06c

Please sign in to comment.