Skip to content

Commit

Permalink
Merge f854433 into 1aab418
Browse files Browse the repository at this point in the history
  • Loading branch information
APozdniakov authored Oct 15, 2024
2 parents 1aab418 + f854433 commit e264891
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "ClSourceSettings"},
"Children": [
{"Index": 0, "Name": "Table", "Type": "TCoAtom"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ class TClickHouseDataSourceTypeAnnotationTransformer : public TVisitorTransforme
}

TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 3U, ctx)) {
if (!EnsureArgsCount(*input, 4, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TClSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TClSourceSettings>()
.World(clReadTable.World())
.Table(clReadTable.Table())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "GenSourceSettings"},
"Children": [
{"Index": 0, "Name": "Cluster", "Type": "TCoAtom"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Cluster", "Type": "TCoAtom"},
{"Index": 2, "Name": "Table", "Type": "TCoAtom"},
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 4, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ namespace NYql {
}

TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 5, ctx)) {
if (!EnsureArgsCount(*input, 6, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TGenSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ namespace NYql {
// clang-format off
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TGenSourceSettings>()
.World(genReadTable.World())
.Cluster(genReadTable.DataSource().Cluster())
.Table(genReadTable.Table())
.Token<TCoSecureParam>()
Expand Down
11 changes: 6 additions & 5 deletions ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "DqPqTopicSource"},
"Children": [
{"Index": 0, "Name": "Topic", "Type": "TPqTopic"},
{"Index": 1, "Name": "Columns", "Type": "TExprBase"},
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Topic", "Type": "TPqTopic"},
{"Index": 2, "Name": "Columns", "Type": "TExprBase"},
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,16 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleDqTopicSource(TExprBase input, TExprContext& ctx) {
if (!EnsureArgsCount(input.Ref(), 5, ctx)) {
if (!EnsureArgsCount(input.Ref(), 6, ctx)) {
return TStatus::Error;
}

TDqPqTopicSource topicSource = input.Cast<TDqPqTopicSource>();

if (!EnsureWorldType(topicSource.World().Ref(), ctx)) {
return TStatus::Error;
}

TPqTopic topic = topicSource.Topic();

if (!EnsureCallable(topic.Ref(), ctx)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class TPqDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TDqPqTopicSource>()
.World(pqReadTopic.World())
.Topic(pqReadTopic.Topic())
.Columns(std::move(columnNames))
.Settings(BuildTopicReadSettings(clusterName, dqSettings, read->Pos(), format, ctx))
Expand Down
23 changes: 12 additions & 11 deletions ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,32 @@
"Base": "TCallable",
"Match": {"Type": "CallableBase"},
"Children": [
{"Index": 0, "Name": "Paths", "Type": "TS3Paths"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "RowsLimitHint", "Type": "TCoAtom"},
{"Index": 3, "Name": "Path", "Type": "TCoAtom"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Paths", "Type": "TS3Paths"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "RowsLimitHint", "Type": "TCoAtom"},
{"Index": 4, "Name": "Path", "Type": "TCoAtom"}
]
},
{
"Name": "TS3SourceSettings",
"Base": "TS3SourceSettingsBase",
"Match": {"Type": "Callable", "Name": "S3SourceSettings"},
"Children": [
{"Index": 4, "Name": "SizeLimit", "Type": "TExprBase", "Optional": true},
{"Index": 5, "Name": "PathPattern", "Type": "TExprBase", "Optional": true},
{"Index": 6, "Name": "PathPatternVariant", "Type": "TExprBase", "Optional": true}
{"Index": 5, "Name": "SizeLimit", "Type": "TExprBase", "Optional": true},
{"Index": 6, "Name": "PathPattern", "Type": "TExprBase", "Optional": true},
{"Index": 7, "Name": "PathPatternVariant", "Type": "TExprBase", "Optional": true}
]
},
{
"Name": "TS3ParseSettings",
"Base": "TS3SourceSettingsBase",
"Match": {"Type": "Callable", "Name": "S3ParseSettings"},
"Children": [
{"Index": 4, "Name": "Format", "Type": "TCoAtom"},
{"Index": 5, "Name": "RowType", "Type": "TExprBase"},
{"Index": 6, "Name": "FilterPredicate", "Type": "TCoLambda"},
{"Index": 7, "Name": "Settings", "Type": "TExprBase", "Optional": true}
{"Index": 5, "Name": "Format", "Type": "TCoAtom"},
{"Index": 6, "Name": "RowType", "Type": "TExprBase"},
{"Index": 7, "Name": "FilterPredicate", "Type": "TCoLambda"},
{"Index": 8, "Name": "Settings", "Type": "TExprBase", "Optional": true}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleS3SourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureMinArgsCount(*input, 4U, ctx)) {
if (!EnsureMinMaxArgsCount(*input, 5, 8, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TS3SourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -335,7 +339,11 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleS3ParseSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 7U, 8U, ctx)) {
if (!EnsureMinMaxArgsCount(*input, 8, 9, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TS3ParseSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") {
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3ParseSettings>()
.World(s3ReadObject.World())
.Paths(s3ReadObject.Object().Paths())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down Expand Up @@ -331,6 +332,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
auto emptyNode = Build<TCoVoid>(ctx, read->Pos()).Done().Ptr();
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3SourceSettings>()
.World(s3ReadObject.World())
.Paths(s3ReadObject.Object().Paths())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "SoSourceSettings"},
"Children": [
{"Index": 0, "Name": "Project", "Type": "TCoAtom"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "RowType", "Type": "TExprBase"},
{"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"},
{"Index": 5, "Name": "From", "Type": "TCoAtom"},
{"Index": 6, "Name": "To", "Type": "TCoAtom"},
{"Index": 7, "Name": "Program", "Type": "TCoAtom"},
{"Index": 8, "Name": "DownsamplingDisabled", "Type": "TCoBool"},
{"Index": 9, "Name": "DownsamplingAggregation", "Type": "TCoAtom"},
{"Index": 10, "Name": "DownsamplingFill", "Type": "TCoAtom"},
{"Index": 11, "Name": "DownsamplingGridSec", "Type": "TCoUint32"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Project", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "RowType", "Type": "TExprBase"},
{"Index": 4, "Name": "SystemColumns", "Type": "TCoAtomList"},
{"Index": 5, "Name": "LabelNames", "Type": "TCoAtomList"},
{"Index": 6, "Name": "From", "Type": "TCoAtom"},
{"Index": 7, "Name": "To", "Type": "TCoAtom"},
{"Index": 8, "Name": "Program", "Type": "TCoAtom"},
{"Index": 9, "Name": "DownsamplingDisabled", "Type": "TCoBool"},
{"Index": 10, "Name": "DownsamplingAggregation", "Type": "TCoAtom"},
{"Index": 11, "Name": "DownsamplingFill", "Type": "TCoAtom"},
{"Index": 12, "Name": "DownsamplingGridSec", "Type": "TCoUint32"}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
}

TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 12U, ctx)) {
if (!EnsureArgsCount(*input, 13, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TSoSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TSoSourceSettings>()
.World(soReadObject.World())
.Project(soReadObject.Object().Project())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "YdbSourceSettings"},
"Children": [
{"Index": 0, "Name": "Table", "Type": "TCoAtom"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ class TYdbDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {

TStatus HandleYdbSourceSettings(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
Y_UNUSED(output);
if (!EnsureArgsCount(*input, 3U, ctx)) {
if (!EnsureArgsCount(*input, 4, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TYdbSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class TYdbDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TYdbSourceSettings>()
.World(ydbReadTable.World())
.Table(ydbReadTable.Table())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down

0 comments on commit e264891

Please sign in to comment.