Skip to content

Commit

Permalink
YQ-3703 pushdown for extract members in PQ provider (#9939)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Oct 4, 2024
1 parent bc116d9 commit 778d2d7
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@
{"Index": 1, "Name": "Columns", "Type": "TExprBase"},
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"},
{"Index": 5, "Name": "ColumnTypes", "Type": "TExprBase"}
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleDqTopicSource(TExprBase input, TExprContext& ctx) {
if (!EnsureArgsCount(input.Ref(), 6, ctx)) {
if (!EnsureArgsCount(input.Ref(), 5, ctx)) {
return TStatus::Error;
}

Expand Down
28 changes: 8 additions & 20 deletions ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,17 @@ class TPqDqIntegration: public TDqIntegrationBase {

const auto token = "cluster:default_" + clusterName;

auto rowSchema = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
TExprNode::TListType colTypes;
const auto& typeItems = rowSchema->GetItems();
colTypes.reserve(typeItems.size());
const auto pos = read->Pos(); // TODO
std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colTypes),
[&](const TItemExprType* item) {
return ctx.NewAtom(pos, FormatType(item->GetItemType()));
});
auto columnTypes = ctx.NewList(pos, std::move(colTypes));

const auto& typeItems = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetItems();
const auto pos = read->Pos();

TExprNode::TListType colNames;
colNames.reserve(typeItems.size());
std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colNames),
[&](const TItemExprType* item) {
return ctx.NewAtom(pos, item->GetName());
});
auto columnNames = ctx.NewList(pos, std::move(colNames));

auto row = Build<TCoArgument>(ctx, read->Pos())
.Name("row")
.Done();
Expand All @@ -153,7 +145,6 @@ class TPqDqIntegration: public TDqIntegrationBase {
.Build()
.Done().Ptr();


return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TDqPqTopicSource>()
.Topic(pqReadTopic.Topic())
Expand All @@ -163,7 +154,6 @@ class TPqDqIntegration: public TDqIntegrationBase {
.Name().Build(token)
.Build()
.FilterPredicate(emptyPredicate)
.ColumnTypes(std::move(columnTypes))
.Build()
.RowType(ExpandType(pqReadTopic.Pos(), *rowType, ctx))
.DataSource(pqReadTopic.DataSource().Cast<TCoDataSource>())
Expand Down Expand Up @@ -263,14 +253,12 @@ class TPqDqIntegration: public TDqIntegrationBase {
srcDesc.AddMetadataFields(metadata.Value().Maybe<TCoAtom>().Cast().StringValue());
}

for (const auto& column : topicSource.Columns().Cast<TCoAtomList>()) {
srcDesc.AddColumns(column.StringValue());
const auto rowSchema = topic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
for (const auto& item : rowSchema->GetItems()) {
srcDesc.AddColumns(TString(item->GetName()));
srcDesc.AddColumnTypes(FormatType(item->GetItemType()));
}

for (const auto& columnTypes : topicSource.ColumnTypes().Cast<TCoAtomList>()) {
srcDesc.AddColumnTypes(columnTypes.StringValue());
}

NYql::NConnector::NApi::TPredicate predicateProto;
if (auto predicate = topicSource.FilterPredicate(); !NYql::IsEmptyFilterPredicate(predicate)) {
TStringBuilder err;
Expand Down
114 changes: 62 additions & 52 deletions ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "yql_pq_provider_impl.h"

#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_type_helpers.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
Expand Down Expand Up @@ -30,22 +31,20 @@ namespace {
}
};

std::unordered_set<TString> GetUsedMetadataFields(const TCoExtractMembers& extract) {
std::unordered_set<TString> usedMetadataFields;
for (const auto extractMember : extract.Members()) {
if (FindPqMetaFieldDescriptorBySysColumn(extractMember.StringValue())) {
usedMetadataFields.emplace(extractMember.StringValue());
}
std::unordered_set<TString> GetUsedColumnNames(const TCoExtractMembers& extractMembers) {
std::unordered_set<TString> usedColumnNames;
for (const auto& member : extractMembers.Members()) {
usedColumnNames.emplace(member.StringValue());
}

return usedMetadataFields;
return usedColumnNames;
}

TVector<TCoNameValueTuple> DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set<TString>& usedMetadataFields) {
TVector<TCoNameValueTuple> DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set<TString>& usedColumnNames) {
TVector<TCoNameValueTuple> newSourceMetadata;
for (auto metadataItem : pqTopic.Metadata()) {
auto metadataName = metadataItem.Cast<TCoNameValueTuple>().Value().Maybe<TCoAtom>().Cast().StringValue();
if (usedMetadataFields.contains(metadataName)) {
if (FindPqMetaFieldDescriptorBySysColumn(metadataName) && usedColumnNames.contains(metadataName)) {
newSourceMetadata.push_back(metadataItem);
}
}
Expand Down Expand Up @@ -88,18 +87,18 @@ TCoNameValueTupleList DropUnusedMetadataFromDqWrapSettings(
.Done();
}

TExprNode::TPtr DropUnusedMetadataFieldsFromRowType(
TExprNode::TPtr DropUnusedRowItems(
TPositionHandle position,
const TStructExprType* oldRowType,
const std::unordered_set<TString>& usedMetadataFields,
const std::unordered_set<TString>& usedColumnNames,
TExprContext& ctx)
{
TVector<const TItemExprType*> newFields;
newFields.reserve(oldRowType->GetSize());

for (auto itemExprType : oldRowType->GetItems()) {
const auto columnName = TString(itemExprType->GetName());
if (FindPqMetaFieldDescriptorBySysColumn(columnName) && !usedMetadataFields.contains(columnName)) {
if (!usedColumnNames.contains(columnName)) {
continue;
}

Expand All @@ -109,14 +108,14 @@ TExprNode::TPtr DropUnusedMetadataFieldsFromRowType(
return ExpandType(position, *ctx.MakeType<TStructExprType>(newFields), ctx);
}

TExprNode::TPtr DropUnusedMetadataFieldsFromColumns(
TExprNode::TPtr DropUnusedColumns(
TExprBase oldColumns,
const std::unordered_set<TString>& usedMetadataFields,
const std::unordered_set<TString>& usedColumnNames,
TExprContext& ctx)
{
TExprNode::TListType res;
for (const auto& column : oldColumns.Cast<TCoAtomList>()) {
if (FindPqMetaFieldDescriptorBySysColumn(column.StringValue()) && !usedMetadataFields.contains(column.StringValue())) {
if (!usedColumnNames.contains(column.StringValue())) {
continue;
}

Expand Down Expand Up @@ -160,57 +159,68 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
}*/

TMaybeNode<TExprBase> ExtractMembersOverDqWrap(TExprBase node, TExprContext& ctx) const {
const auto& extract = node.Cast<TCoExtractMembers>();
const auto& input = extract.Input();
const auto dqSourceWrap = input.Maybe<TDqSourceWrap>();
const auto dqPqTopicSource = dqSourceWrap.Input().Maybe<TDqPqTopicSource>();
const auto pqTopic = dqPqTopicSource.Topic().Maybe<TPqTopic>();
if (!pqTopic) {
const auto& extractMembers = node.Cast<TCoExtractMembers>();
const auto& extractMembersInput = extractMembers.Input();
const auto& maybeDqSourceWrap = extractMembersInput.Maybe<TDqSourceWrap>();
if (!maybeDqSourceWrap) {
return node;
}

const auto& dqSourceWrap = maybeDqSourceWrap.Cast();
if (dqSourceWrap.DataSource().Category() != PqProviderName) {
return node;
}

const auto& maybeDqPqTopicSource = dqSourceWrap.Input().Maybe<TDqPqTopicSource>();
if (!maybeDqPqTopicSource) {
return node;
}

const auto usedMetadataFields = GetUsedMetadataFields(extract);
const auto newSourceMetadata = DropUnusedMetadata(pqTopic.Cast(), usedMetadataFields);
if (newSourceMetadata.size() == pqTopic.Metadata().Cast().Size()) {
const auto& dqPqTopicSource = maybeDqPqTopicSource.Cast();
const auto& pqTopic = dqPqTopicSource.Topic();

auto usedColumnNames = GetUsedColumnNames(extractMembers);
const TStructExprType* inputRowType = pqTopic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
const TStructExprType* outputRowType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
if (outputRowType->GetSize() == 0 && inputRowType->GetSize() > 0) {
auto item = GetLightColumn(*inputRowType);
YQL_ENSURE(item);
YQL_ENSURE(usedColumnNames.insert(TString(item->GetName())).second);
}

const auto oldRowType = pqTopic.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
if (oldRowType->GetSize() == usedColumnNames.size()) {
return node;
}

const auto oldRowType = pqTopic.Ref().GetTypeAnn()
->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
const auto& newSourceMetadata = DropUnusedMetadata(pqTopic, usedColumnNames);

auto newPqTopicSource = Build<TDqPqTopicSource>(ctx, node.Pos())
.InitFrom(dqPqTopicSource.Cast())
const TExprNode::TPtr newPqTopicSource = Build<TDqPqTopicSource>(ctx, dqPqTopicSource.Pos())
.InitFrom(dqPqTopicSource)
.Topic<TPqTopic>()
.InitFrom(pqTopic.Cast())
.InitFrom(pqTopic)
.Metadata().Add(newSourceMetadata).Build()
.Build();

if (dqPqTopicSource.Columns()) {
auto newColumns = DropUnusedMetadataFieldsFromColumns(
dqPqTopicSource.Columns().Cast(),
usedMetadataFields,
ctx);
newPqTopicSource.Columns(newColumns);
}
.RowSpec(DropUnusedRowItems(pqTopic.RowSpec().Pos(), inputRowType, usedColumnNames, ctx))
.Build()
.Columns(DropUnusedColumns(dqPqTopicSource.Columns(), usedColumnNames, ctx))
.Done()
.Ptr();

const auto newDqSourceWrap = Build<TDqSourceWrap>(ctx, node.Pos())
.InitFrom(dqSourceWrap.Cast())
.Input(newPqTopicSource.Done())
.Settings(DropUnusedMetadataFromDqWrapSettings(
dqSourceWrap.Cast(),
newSourceMetadata,
ctx))
.RowType(DropUnusedMetadataFieldsFromRowType(
node.Pos(),
oldRowType,
usedMetadataFields,
ctx))
const TExprNode::TPtr newDqSourceWrap = Build<TDqSourceWrap>(ctx, dqSourceWrap.Pos())
.InitFrom(dqSourceWrap)
.Input(newPqTopicSource)
.Settings(DropUnusedMetadataFromDqWrapSettings(dqSourceWrap, newSourceMetadata, ctx))
.RowType(DropUnusedRowItems(dqSourceWrap.RowType().Pos(), oldRowType, usedColumnNames, ctx))
.Done()
.Ptr();

if (outputRowType->GetSize() == usedColumnNames.size()) {
return newDqSourceWrap;
}

return Build<TCoExtractMembers>(ctx, node.Pos())
.InitFrom(extract)
.Input(ctx.ReplaceNode(input.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap))
.InitFrom(extractMembers)
.Input(ctx.ReplaceNode(extractMembersInput.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap))
.Done();
}

Expand Down

0 comments on commit 778d2d7

Please sign in to comment.