diff --git a/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json b/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json index 8a8f172d307f..0b178695aaeb 100644 --- a/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json +++ b/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json @@ -71,8 +71,7 @@ {"Index": 1, "Name": "Columns", "Type": "TExprBase"}, {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"}, {"Index": 3, "Name": "Token", "Type": "TCoSecureParam"}, - {"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}, - {"Index": 5, "Name": "ColumnTypes", "Type": "TExprBase"} + {"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"} ] }, { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index 11d6194e51c5..ad404949ed03 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -132,7 +132,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { } TStatus HandleDqTopicSource(TExprBase input, TExprContext& ctx) { - if (!EnsureArgsCount(input.Ref(), 6, ctx)) { + if (!EnsureArgsCount(input.Ref(), 5, ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index bac0ba92fbc7..01ef5f61b391 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -124,17 +124,9 @@ class TPqDqIntegration: public TDqIntegrationBase { const auto token = "cluster:default_" + clusterName; - auto rowSchema = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast()->GetType()->Cast(); - TExprNode::TListType colTypes; - const auto& typeItems = rowSchema->GetItems(); - colTypes.reserve(typeItems.size()); - const auto pos = read->Pos(); // TODO - std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colTypes), - [&](const TItemExprType* item) { - return ctx.NewAtom(pos, FormatType(item->GetItemType())); - }); - auto columnTypes = ctx.NewList(pos, std::move(colTypes)); - + const auto& typeItems = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetItems(); + const auto pos = read->Pos(); + TExprNode::TListType colNames; colNames.reserve(typeItems.size()); std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colNames), @@ -142,7 +134,7 @@ class TPqDqIntegration: public TDqIntegrationBase { return ctx.NewAtom(pos, item->GetName()); }); auto columnNames = ctx.NewList(pos, std::move(colNames)); - + auto row = Build(ctx, read->Pos()) .Name("row") .Done(); @@ -153,7 +145,6 @@ class TPqDqIntegration: public TDqIntegrationBase { .Build() .Done().Ptr(); - return Build(ctx, read->Pos()) .Input() .Topic(pqReadTopic.Topic()) @@ -163,7 +154,6 @@ class TPqDqIntegration: public TDqIntegrationBase { .Name().Build(token) .Build() .FilterPredicate(emptyPredicate) - .ColumnTypes(std::move(columnTypes)) .Build() .RowType(ExpandType(pqReadTopic.Pos(), *rowType, ctx)) .DataSource(pqReadTopic.DataSource().Cast()) @@ -263,14 +253,12 @@ class TPqDqIntegration: public TDqIntegrationBase { srcDesc.AddMetadataFields(metadata.Value().Maybe().Cast().StringValue()); } - for (const auto& column : topicSource.Columns().Cast()) { - srcDesc.AddColumns(column.StringValue()); + const auto rowSchema = topic.RowSpec().Ref().GetTypeAnn()->Cast()->GetType()->Cast(); + for (const auto& item : rowSchema->GetItems()) { + srcDesc.AddColumns(TString(item->GetName())); + srcDesc.AddColumnTypes(FormatType(item->GetItemType())); } - for (const auto& columnTypes : topicSource.ColumnTypes().Cast()) { - srcDesc.AddColumnTypes(columnTypes.StringValue()); - } - NYql::NConnector::NApi::TPredicate predicateProto; if (auto predicate = topicSource.FilterPredicate(); !NYql::IsEmptyFilterPredicate(predicate)) { TStringBuilder err; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp index 92964948185a..b78c94e7705a 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp @@ -1,6 +1,7 @@ #include "yql_pq_provider_impl.h" #include +#include #include #include #include @@ -30,22 +31,20 @@ namespace { } }; -std::unordered_set GetUsedMetadataFields(const TCoExtractMembers& extract) { - std::unordered_set usedMetadataFields; - for (const auto extractMember : extract.Members()) { - if (FindPqMetaFieldDescriptorBySysColumn(extractMember.StringValue())) { - usedMetadataFields.emplace(extractMember.StringValue()); - } +std::unordered_set GetUsedColumnNames(const TCoExtractMembers& extractMembers) { + std::unordered_set usedColumnNames; + for (const auto& member : extractMembers.Members()) { + usedColumnNames.emplace(member.StringValue()); } - return usedMetadataFields; + return usedColumnNames; } -TVector DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set& usedMetadataFields) { +TVector DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set& usedColumnNames) { TVector newSourceMetadata; for (auto metadataItem : pqTopic.Metadata()) { auto metadataName = metadataItem.Cast().Value().Maybe().Cast().StringValue(); - if (usedMetadataFields.contains(metadataName)) { + if (FindPqMetaFieldDescriptorBySysColumn(metadataName) && usedColumnNames.contains(metadataName)) { newSourceMetadata.push_back(metadataItem); } } @@ -88,10 +87,10 @@ TCoNameValueTupleList DropUnusedMetadataFromDqWrapSettings( .Done(); } -TExprNode::TPtr DropUnusedMetadataFieldsFromRowType( +TExprNode::TPtr DropUnusedRowItems( TPositionHandle position, const TStructExprType* oldRowType, - const std::unordered_set& usedMetadataFields, + const std::unordered_set& usedColumnNames, TExprContext& ctx) { TVector newFields; @@ -99,7 +98,7 @@ TExprNode::TPtr DropUnusedMetadataFieldsFromRowType( for (auto itemExprType : oldRowType->GetItems()) { const auto columnName = TString(itemExprType->GetName()); - if (FindPqMetaFieldDescriptorBySysColumn(columnName) && !usedMetadataFields.contains(columnName)) { + if (!usedColumnNames.contains(columnName)) { continue; } @@ -109,14 +108,14 @@ TExprNode::TPtr DropUnusedMetadataFieldsFromRowType( return ExpandType(position, *ctx.MakeType(newFields), ctx); } -TExprNode::TPtr DropUnusedMetadataFieldsFromColumns( +TExprNode::TPtr DropUnusedColumns( TExprBase oldColumns, - const std::unordered_set& usedMetadataFields, + const std::unordered_set& usedColumnNames, TExprContext& ctx) { TExprNode::TListType res; for (const auto& column : oldColumns.Cast()) { - if (FindPqMetaFieldDescriptorBySysColumn(column.StringValue()) && !usedMetadataFields.contains(column.StringValue())) { + if (!usedColumnNames.contains(column.StringValue())) { continue; } @@ -160,57 +159,68 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase { }*/ TMaybeNode ExtractMembersOverDqWrap(TExprBase node, TExprContext& ctx) const { - const auto& extract = node.Cast(); - const auto& input = extract.Input(); - const auto dqSourceWrap = input.Maybe(); - const auto dqPqTopicSource = dqSourceWrap.Input().Maybe(); - const auto pqTopic = dqPqTopicSource.Topic().Maybe(); - if (!pqTopic) { + const auto& extractMembers = node.Cast(); + const auto& extractMembersInput = extractMembers.Input(); + const auto& maybeDqSourceWrap = extractMembersInput.Maybe(); + if (!maybeDqSourceWrap) { + return node; + } + + const auto& dqSourceWrap = maybeDqSourceWrap.Cast(); + if (dqSourceWrap.DataSource().Category() != PqProviderName) { + return node; + } + + const auto& maybeDqPqTopicSource = dqSourceWrap.Input().Maybe(); + if (!maybeDqPqTopicSource) { return node; } - const auto usedMetadataFields = GetUsedMetadataFields(extract); - const auto newSourceMetadata = DropUnusedMetadata(pqTopic.Cast(), usedMetadataFields); - if (newSourceMetadata.size() == pqTopic.Metadata().Cast().Size()) { + const auto& dqPqTopicSource = maybeDqPqTopicSource.Cast(); + const auto& pqTopic = dqPqTopicSource.Topic(); + + auto usedColumnNames = GetUsedColumnNames(extractMembers); + const TStructExprType* inputRowType = pqTopic.RowSpec().Ref().GetTypeAnn()->Cast()->GetType()->Cast(); + const TStructExprType* outputRowType = node.Ref().GetTypeAnn()->Cast()->GetItemType()->Cast(); + if (outputRowType->GetSize() == 0 && inputRowType->GetSize() > 0) { + auto item = GetLightColumn(*inputRowType); + YQL_ENSURE(item); + YQL_ENSURE(usedColumnNames.insert(TString(item->GetName())).second); + } + + const auto oldRowType = pqTopic.Ref().GetTypeAnn()->Cast()->GetItemType()->Cast(); + if (oldRowType->GetSize() == usedColumnNames.size()) { return node; } - const auto oldRowType = pqTopic.Ref().GetTypeAnn() - ->Cast()->GetItemType()->Cast(); + const auto& newSourceMetadata = DropUnusedMetadata(pqTopic, usedColumnNames); - auto newPqTopicSource = Build(ctx, node.Pos()) - .InitFrom(dqPqTopicSource.Cast()) + const TExprNode::TPtr newPqTopicSource = Build(ctx, dqPqTopicSource.Pos()) + .InitFrom(dqPqTopicSource) .Topic() - .InitFrom(pqTopic.Cast()) + .InitFrom(pqTopic) .Metadata().Add(newSourceMetadata).Build() - .Build(); - - if (dqPqTopicSource.Columns()) { - auto newColumns = DropUnusedMetadataFieldsFromColumns( - dqPqTopicSource.Columns().Cast(), - usedMetadataFields, - ctx); - newPqTopicSource.Columns(newColumns); - } + .RowSpec(DropUnusedRowItems(pqTopic.RowSpec().Pos(), inputRowType, usedColumnNames, ctx)) + .Build() + .Columns(DropUnusedColumns(dqPqTopicSource.Columns(), usedColumnNames, ctx)) + .Done() + .Ptr(); - const auto newDqSourceWrap = Build(ctx, node.Pos()) - .InitFrom(dqSourceWrap.Cast()) - .Input(newPqTopicSource.Done()) - .Settings(DropUnusedMetadataFromDqWrapSettings( - dqSourceWrap.Cast(), - newSourceMetadata, - ctx)) - .RowType(DropUnusedMetadataFieldsFromRowType( - node.Pos(), - oldRowType, - usedMetadataFields, - ctx)) + const TExprNode::TPtr newDqSourceWrap = Build(ctx, dqSourceWrap.Pos()) + .InitFrom(dqSourceWrap) + .Input(newPqTopicSource) + .Settings(DropUnusedMetadataFromDqWrapSettings(dqSourceWrap, newSourceMetadata, ctx)) + .RowType(DropUnusedRowItems(dqSourceWrap.RowType().Pos(), oldRowType, usedColumnNames, ctx)) .Done() .Ptr(); + if (outputRowType->GetSize() == usedColumnNames.size()) { + return newDqSourceWrap; + } + return Build(ctx, node.Pos()) - .InitFrom(extract) - .Input(ctx.ReplaceNode(input.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap)) + .InitFrom(extractMembers) + .Input(ctx.ReplaceNode(extractMembersInput.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap)) .Done(); }