diff --git a/ydb/library/yql/core/services/yql_lineage.cpp b/ydb/library/yql/core/services/yql_lineage.cpp index 6ac9502559ff..6a3a57894633 100644 --- a/ydb/library/yql/core/services/yql_lineage.cpp +++ b/ydb/library/yql/core/services/yql_lineage.cpp @@ -298,7 +298,8 @@ class TLineageScanner { } TMaybe ScanExprLineage(const TExprNode& node, const TExprNode* arg, const TLineage* src, - TNodeMap>& visited) { + TNodeMap>& visited, + const THashMap& flattenColumns) { if (&node == arg) { return Nothing(); } @@ -308,6 +309,10 @@ class TLineageScanner { return it->second; } + if (auto itFlatten = flattenColumns.find(&node); itFlatten != flattenColumns.end()) { + return it->second = *(*src->Fields).FindPtr(itFlatten->second); + } + if (node.IsCallable("Member")) { if (&node.Head() == arg && src) { return it->second = *(*src->Fields).FindPtr(node.Tail().Content()); @@ -325,7 +330,7 @@ class TLineageScanner { } } - auto inner = ScanExprLineage(node.Head(), arg, src, visited); + auto inner = ScanExprLineage(node.Head(), arg, src, visited, {}); if (!inner) { return Nothing(); } @@ -365,7 +370,7 @@ class TLineageScanner { continue; } - auto inner = ScanExprLineage(*child, arg, src, visited); + auto inner = ScanExprLineage(*child, arg, src, visited, {}); if (!inner) { return Nothing(); } @@ -392,10 +397,11 @@ class TLineageScanner { } void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src, - TFieldLineageSet& dst, const TString& newTransforms = "") { + TFieldLineageSet& dst, const THashMap& flattenColumns, + const TString& newTransforms = "") { TNodeMap> visited; - auto res = ScanExprLineage(expr, &arg, &src, visited); + auto res = ScanExprLineage(expr, &arg, &src, visited, flattenColumns); if (!res) { for (const auto& f : *src.Fields) { for (const auto& i: f.second.Items) { @@ -410,7 +416,8 @@ class TLineageScanner { } void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src, - TFieldsLineage& dst, bool produceStruct, const TString& newTransforms = "") { + TFieldsLineage& dst, bool produceStruct, const THashMap& flattenColumns, + const TString& newTransforms = "") { if (produceStruct) { auto root = &expr; while (root->IsCallable("Just")) { @@ -427,7 +434,7 @@ class TLineageScanner { for (const auto& x : root->Children()) { auto fieldName = x->Head().Content(); auto& s = (*dst.StructItems)[fieldName]; - MergeLineageFromUsedFields(x->Tail(), arg, src, s, newTransforms); + MergeLineageFromUsedFields(x->Tail(), arg, src, s, flattenColumns, newTransforms); } } else if (root->IsCallable("Member") && &root->Head() == &arg) { auto fieldName = root->Tail().Content(); @@ -436,11 +443,11 @@ class TLineageScanner { } } - MergeLineageFromUsedFields(expr, arg, src, dst.Items, newTransforms); + MergeLineageFromUsedFields(expr, arg, src, dst.Items, flattenColumns, newTransforms); } void FillStructLineage(TLineage& lineage, const TExprNode* value, const TExprNode& arg, const TLineage& innerLineage, - const TTypeAnnotationNode* extType) { + const TTypeAnnotationNode* extType, const THashMap& flattenColumns) { TMaybe oneField; if (value && value->IsCallable("Member") && &value->Head() == &arg) { TString field(value->Tail().Content()); @@ -462,8 +469,8 @@ class TLineageScanner { TLineage left, right; left.Fields.ConstructInPlace(); right.Fields.ConstructInPlace(); - FillStructLineage(left, value->Child(1), arg, innerLineage, extType); - FillStructLineage(right, value->Child(2), arg, innerLineage, extType); + FillStructLineage(left, value->Child(1), arg, innerLineage, extType, {}); + FillStructLineage(right, value->Child(2), arg, innerLineage, extType, {}); for (const auto& f : *left.Fields) { auto& res = (*lineage.Fields)[f.first]; res.Items.insert(f.second.Items.begin(), f.second.Items.end()); @@ -483,7 +490,7 @@ class TLineageScanner { auto& res = (*lineage.Fields)[field]; const auto& expr = child->Tail(); TString newTransforms; - auto root = &expr; + const TExprNode* root = &expr; while (root->IsCallable("Just")) { root = &root->Head(); } @@ -492,7 +499,7 @@ class TLineageScanner { newTransforms = "Copy"; } - MergeLineageFromUsedFields(expr, arg, innerLineage, res, true, newTransforms); + MergeLineageFromUsedFields(expr, arg, innerLineage, res, true, flattenColumns, newTransforms); } return; @@ -526,13 +533,30 @@ class TLineageScanner { const auto& lambda = node.Tail(); const auto& arg = lambda.Head().Head(); const auto& body = lambda.Tail(); - const TExprNode* value; + THashMap flattenColumns; + const TExprNode* value = &body.Tail(); if (body.IsCallable({"OptionalIf", "FlatListIf"})) { value = &body.Tail(); } else if (body.IsCallable("Just")) { value = &body.Head(); } else if (body.IsCallable({"FlatMap", "OrderedFlatMap"})) { - value = &body.Head(); + if (lambda.GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) { + value = &body; + while(value->IsCallable({"FlatMap", "OrderedFlatMap"})) { + if (value->Head().IsCallable("Member") && &value->Head().Head() == &arg) { + TString field(value->Head().Tail().Content()); + flattenColumns.emplace(value->Tail().Head().HeadPtr().Get(), field); + } + value = &value->Tail().Tail(); + } + if (value->IsCallable("Just")) { + value = &value->Head(); + } else if (value->IsCallable({"OptionalIf", "FlatListIf"})) { + value = &value->Tail(); + } + } else { + value = &body.Head(); + } } else { Warning(body); return; @@ -544,7 +568,7 @@ class TLineageScanner { } lineage.Fields.ConstructInPlace(); - FillStructLineage(lineage, value, arg, innerLineage, GetSeqItemType(body.GetTypeAnn())); + FillStructLineage(lineage, value, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), flattenColumns); } void HandleAggregate(TLineage& lineage, const TExprNode& node) { @@ -578,12 +602,12 @@ class TLineageScanner { // merge all used fields from init/update handlers auto initHandler = payload->Child(1)->Child(1); auto updateHandler = payload->Child(1)->Child(2); - MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, source, false); - MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, source, false); + MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, source, false, {}); + MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, source, false, {}); } else if (payload->Child(1)->IsCallable("AggApply")) { auto extractHandler = payload->Child(1)->Child(2); bool produceStruct = payload->Child(1)->Head().Content() == "some"; - MergeLineageFromUsedFields(extractHandler->Tail(), extractHandler->Head().Head(), innerLineage, source, produceStruct); + MergeLineageFromUsedFields(extractHandler->Tail(), extractHandler->Head().Head(), innerLineage, source, produceStruct, {}); } else { Warning(*payload->Child(1)); lineage.Fields.Clear(); @@ -612,7 +636,7 @@ class TLineageScanner { } lineage.Fields.ConstructInPlace(); - FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn())); + FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), {}); } void HandlePartitionByKeys(TLineage& lineage, const TExprNode& node) { @@ -630,7 +654,7 @@ class TLineageScanner { } lineage.Fields.ConstructInPlace(); - FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn())); + FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), {}); } void HandleExtend(TLineage& lineage, const TExprNode& node) { @@ -709,8 +733,8 @@ class TLineageScanner { auto& res = (*lineage.Fields)[sessionColumn->Content()]; const auto& initHandler = node.Child(4)->Child(2); const auto& updateHandler = node.Child(4)->Child(2); - MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false); - MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false); + MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false, {}); + MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false, {}); } } @@ -730,12 +754,12 @@ class TLineageScanner { } else if (list->Tail().IsCallable({"Lag","Lead","Rank","DenseRank","PercentRank"})) { const auto& lambda = list->Tail().Child(1); bool produceStruct = list->Tail().IsCallable({"Lag","Lead"}); - MergeLineageFromUsedFields(lambda->Tail(), lambda->Head().Head(), innerLineage, res, produceStruct); + MergeLineageFromUsedFields(lambda->Tail(), lambda->Head().Head(), innerLineage, res, produceStruct, {}); } else if (list->Tail().IsCallable("WindowTraits")) { const auto& initHandler = list->Tail().Child(1); const auto& updateHandler = list->Tail().Child(2); - MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false); - MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false); + MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false, {}); + MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false, {}); } else { lineage.Fields.Clear(); return; @@ -850,7 +874,7 @@ class TLineageScanner { if (child->IsCallable("AsStruct")) { for (const auto& f : child->Children()) { TNodeMap> visited; - auto res = ScanExprLineage(f->Tail(), nullptr, nullptr, visited); + auto res = ScanExprLineage(f->Tail(), nullptr, nullptr, visited, {}); if (res) { auto name = f->Head().Content(); (*lineage.Fields)[name].MergeFrom(*res); @@ -858,7 +882,7 @@ class TLineageScanner { } } else { TNodeMap> visited; - auto res = ScanExprLineage(*child, nullptr, nullptr, visited); + auto res = ScanExprLineage(*child, nullptr, nullptr, visited, {}); if (res) { for (const auto& i : structType->GetItems()) { if (i->GetName().StartsWith("_yql_sys_")) { diff --git a/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json index 70562d9bba31..74536f1c5115 100644 --- a/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json @@ -1576,6 +1576,28 @@ } ], "test.test[limit-limit_over_sort_desc_in_subquery--Results]": [], + "test.test[lineage-flatten_by--Analyze]": [ + { + "checksum": "de38a224e0104e35a4d3f64f505d9158", + "size": 7782, + "uri": "https://{canondata_backend}/1773845/4743168c84575c5ee74764d6369a8a7b6f309d6e/resource.tar.gz#test.test_lineage-flatten_by--Analyze_/plan.txt" + } + ], + "test.test[lineage-flatten_by--Debug]": [ + { + "checksum": "df47e3dc178c04a8c1c6fc0a83120230", + "size": 3759, + "uri": "https://{canondata_backend}/1773845/4743168c84575c5ee74764d6369a8a7b6f309d6e/resource.tar.gz#test.test_lineage-flatten_by--Debug_/opt.yql_patched" + } + ], + "test.test[lineage-flatten_by--Plan]": [ + { + "checksum": "de38a224e0104e35a4d3f64f505d9158", + "size": 7782, + "uri": "https://{canondata_backend}/1773845/4743168c84575c5ee74764d6369a8a7b6f309d6e/resource.tar.gz#test.test_lineage-flatten_by--Plan_/plan.txt" + } + ], + "test.test[lineage-flatten_by--Results]": [], "test.test[lineage-grouping_sets--Analyze]": [ { "checksum": "7cd08ec1563a4f59f7b1dd446b2dd421", diff --git a/ydb/library/yql/tests/sql/hybrid_file/part9/canondata/result.json b/ydb/library/yql/tests/sql/hybrid_file/part9/canondata/result.json index cc1da716d5c0..7542520420ab 100644 --- a/ydb/library/yql/tests/sql/hybrid_file/part9/canondata/result.json +++ b/ydb/library/yql/tests/sql/hybrid_file/part9/canondata/result.json @@ -1525,6 +1525,20 @@ "uri": "https://{canondata_backend}/1809005/2a59475dc877549ac4197a291aacd77d92f24ab4/resource.tar.gz#test.test_limit-empty_input_after_limit-default.txt-Plan_/plan.txt" } ], + "test.test[lineage-flatten_by--Debug]": [ + { + "checksum": "ca67fb8416e26fcc6941474954bc8efa", + "size": 3102, + "uri": "https://{canondata_backend}/1900335/8db5941a4ed2bc94d6ae42d0eae7b6c741fa5a59/resource.tar.gz#test.test_lineage-flatten_by--Debug_/opt.yql_patched" + } + ], + "test.test[lineage-flatten_by--Plan]": [ + { + "checksum": "d8e99e1cc64bfe7d765d01c4f3c575e8", + "size": 8880, + "uri": "https://{canondata_backend}/1900335/8db5941a4ed2bc94d6ae42d0eae7b6c741fa5a59/resource.tar.gz#test.test_lineage-flatten_by--Plan_/plan.txt" + } + ], "test.test[match_recognize-alerts_without_order-default.txt-Debug]": [ { "checksum": "acba759d95a9b70640e6418dc1febb2d", diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json index b9d0e66f3032..aed74e86fbb1 100644 --- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json +++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json @@ -10597,6 +10597,13 @@ "uri": "https://{canondata_backend}/1784826/8212a6594777651314d94a2e2f95179c0016604c/resource.tar.gz#test_sql2yql.test_lineage-error_type_/sql.yql" } ], + "test_sql2yql.test[lineage-flatten_by]": [ + { + "checksum": "a761432fec83da9adc9a7828296bda6f", + "size": 4072, + "uri": "https://{canondata_backend}/1937367/b35833bd1950efa4b6fa264900a396b8f3f198a8/resource.tar.gz#test_sql2yql.test_lineage-flatten_by_/sql.yql" + } + ], "test_sql2yql.test[lineage-flatten_list_nested_lambda]": [ { "checksum": "1405a87aecd4676d7955fff219819b5f", @@ -30253,6 +30260,13 @@ "uri": "https://{canondata_backend}/1784826/8212a6594777651314d94a2e2f95179c0016604c/resource.tar.gz#test_sql_format.test_lineage-error_type_/formatted.sql" } ], + "test_sql_format.test[lineage-flatten_by]": [ + { + "checksum": "3f32f309ac009b3158e11e36cc0a92b7", + "size": 451, + "uri": "https://{canondata_backend}/1937367/b35833bd1950efa4b6fa264900a396b8f3f198a8/resource.tar.gz#test_sql_format.test_lineage-flatten_by_/formatted.sql" + } + ], "test_sql_format.test[lineage-flatten_list_nested_lambda]": [ { "checksum": "3fdec3c3ffc5993a6088aa56eac4fcea", diff --git a/ydb/library/yql/tests/sql/suites/lineage/flatten_by.cfg b/ydb/library/yql/tests/sql/suites/lineage/flatten_by.cfg new file mode 100644 index 000000000000..ed48d1a300c1 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/lineage/flatten_by.cfg @@ -0,0 +1 @@ +in Input input_list_2.txt diff --git a/ydb/library/yql/tests/sql/suites/lineage/flatten_by.sql b/ydb/library/yql/tests/sql/suites/lineage/flatten_by.sql new file mode 100644 index 000000000000..16bc737177bb --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/lineage/flatten_by.sql @@ -0,0 +1,21 @@ +use plato; + +$subquery1 = +SELECT +key, subkey, z +FROM Input +FLATTEN LIST BY value as z; + +$subquery2 = +SELECT +key, subkey, value as z, value2 +FROM Input +FLATTEN LIST BY (value, value2); + +INSERT INTO @tmp1 WITH TRUNCATE +SELECT * +FROM $subquery1; + +INSERT INTO @tmp2 WITH TRUNCATE +SELECT * +FROM $subquery2; \ No newline at end of file diff --git a/ydb/library/yql/tests/sql/suites/lineage/input_list_2.txt b/ydb/library/yql/tests/sql/suites/lineage/input_list_2.txt new file mode 100644 index 000000000000..90e76c01fc9a --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/lineage/input_list_2.txt @@ -0,0 +1,3 @@ +{"key"="075";"subkey"="1";"value"=["abc";"cde"];"value2"=["efg"; "ghj"]}; +{"key"="020";"subkey"="3";"value"=["qqq";"ttt"];"value2"=["ppp";"rrr"]}; + diff --git a/ydb/library/yql/tests/sql/suites/lineage/input_list_2.txt.attr b/ydb/library/yql/tests/sql/suites/lineage/input_list_2.txt.attr new file mode 100644 index 000000000000..26fc5bf42947 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/lineage/input_list_2.txt.attr @@ -0,0 +1,10 @@ +{ + "_yql_row_spec"={ + "Type"=["StructType";[ + ["key";["DataType";"String";];]; + ["subkey";["DataType";"String";];]; + ["value";["ListType";["DataType";"String";];];]; + ["value2";["ListType";["DataType";"String";];];]; + ];]; + } +} diff --git a/ydb/library/yql/tests/sql/yt_native_file/part15/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part15/canondata/result.json index 2a567561c109..b97c03485fe8 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part15/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part15/canondata/result.json @@ -1432,6 +1432,28 @@ "uri": "https://{canondata_backend}/1903280/4c77300cd3fef018d27d7f75b6ff956e63258b21/resource.tar.gz#test.test_limit-limit_over_sort_desc_in_subquery--Results_/results.txt" } ], + "test.test[lineage-flatten_by--Debug]": [ + { + "checksum": "b9673e0336e9f6e9bfa2f44fc8b88803", + "size": 3035, + "uri": "https://{canondata_backend}/1775319/264e08443d41b710cd528563fbaa24c32c366555/resource.tar.gz#test.test_lineage-flatten_by--Debug_/opt.yql" + } + ], + "test.test[lineage-flatten_by--Lineage]": [ + { + "checksum": "f4929578f1fe2fce5f566df8020cc0ad", + "size": 3001, + "uri": "https://{canondata_backend}/1775319/264e08443d41b710cd528563fbaa24c32c366555/resource.tar.gz#test.test_lineage-flatten_by--Lineage_/results.txt" + } + ], + "test.test[lineage-flatten_by--Plan]": [ + { + "checksum": "093e3952e33d4d1b806cce1781e9e189", + "size": 8880, + "uri": "https://{canondata_backend}/1775319/264e08443d41b710cd528563fbaa24c32c366555/resource.tar.gz#test.test_lineage-flatten_by--Plan_/plan.txt" + } + ], + "test.test[lineage-flatten_by--Results]": [], "test.test[lineage-grouping_sets--Debug]": [ { "checksum": "9d2798e2536159bea2cb8dc1a8089078",