Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dtenedor committed Nov 23, 2024
1 parent d5da49d commit 6cf23b2
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ version
operatorPipeRightSide
: selectClause windowClause?
| EXTEND extendList=namedExpressionSeq
| SET operatorPipeSetAssignmentSeq
// Note that the WINDOW clause is not allowed in the WHERE pipe operator, but we add it here in
// the grammar simply for purposes of catching this invalid syntax and throwing a specific
// dedicated error message.
Expand All @@ -1520,6 +1521,11 @@ operatorPipeRightSide
| AGGREGATE namedExpressionSeq? aggregationClause?
;

operatorPipeSetAssignmentSeq
: ident+=errorCapturingIdentifier EQ expression
(COMMA ident+=errorCapturingIdentifier EQ expression)*
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
// - Reserved keywords:
// Keywords that are reserved and can't be used as identifiers for table, view, column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ object PipeOperators {
val offsetClause = "OFFSET"
val orderByClause = "ORDER BY"
val selectClause = "SELECT"
val setClause = "SET"
val sortByClause = "SORT BY"
val sortByDistributeByClause = "SORT BY ... DISTRIBUTE BY ..."
val windowClause = "WINDOW"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer, Set}
import scala.jdk.CollectionConverters._
import scala.util.{Left, Right}
Expand Down Expand Up @@ -5952,6 +5953,8 @@ class AstBuilder extends DataTypeAstBuilder
}.get
val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) ++ extendExpressions
Project(projectList, left)
}.getOrElse(Option(ctx.SET).map { _ =>
visitOperatorPipeSet(ctx, left)
}.getOrElse(Option(ctx.whereClause).map { c =>
if (ctx.windowClause() != null) {
throw QueryParsingErrors.windowClauseInPipeOperatorWhereClauseNotAllowedError(ctx)
Expand All @@ -5978,7 +5981,49 @@ class AstBuilder extends DataTypeAstBuilder
withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
}.getOrElse(
visitOperatorPipeAggregate(ctx, left)
)))))))))
))))))))))
}

private def visitOperatorPipeSet(
ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = {
val (setIdentifiers: Seq[String], setTargets: Seq[Expression]) =
visitOperatorPipeSetAssignmentSeq(ctx.operatorPipeSetAssignmentSeq())
var plan = left
val visitedSetIdentifiers = mutable.Set.empty[String]
setIdentifiers.zip(setTargets).foreach {
case (_, _: Alias) =>
operationNotAllowed(
"SQL pipe syntax |> SET operator with an alias assigned with [AS] aliasName", ctx)
case (ident, target) =>
// Check uniqueness of the assignment keys.
val checkKey = if (SQLConf.get.caseSensitiveAnalysis) {
ident.toLowerCase(Locale.ROOT)
} else {
ident
}
if (visitedSetIdentifiers(checkKey)) {
operationNotAllowed(
s"SQL pipe syntax |> SET operator with duplicate assignment key $ident", ctx)
}
visitedSetIdentifiers += checkKey
// Add an UnresolvedStarExcept to exclude the SET expression name from the relation and
// add the new SET expression to the projection list.
// Use a PipeSelect expression to make sure it does not contain any aggregate functions.
val projectList: Seq[NamedExpression] =
Seq(UnresolvedStarExcept(None, Seq(Seq(ident))),
Alias(PipeExpression(target, isAggregate = false, PipeOperators.setClause), ident)())
plan = Project(projectList, plan)
}
plan
}

override def visitOperatorPipeSetAssignmentSeq(
ctx: OperatorPipeSetAssignmentSeqContext): (Seq[String], Seq[Expression]) = {
withOrigin(ctx) {
val setIdentifiers: Seq[String] = ctx.errorCapturingIdentifier().asScala.map(_.getText).toSeq
val setTargets: Seq[Expression] = ctx.expression().asScala.map(typedVisit[Expression]).toSeq
(setIdentifiers, setTargets)
}
}

private def visitOperatorPipeAggregate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,161 @@ org.apache.spark.sql.AnalysisException
}


-- !query
table t
|> set x = 1
-- !query analysis
Project [y#x, pipeexpression(1, false, SET) AS x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> set y = x
-- !query analysis
Project [x#x, pipeexpression(x#x, false, SET) AS y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> extend 1 as z
|> set z = x + length(y)
-- !query analysis
Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, SET) AS z#x]
+- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> extend 1 as z
|> extend 2 as zz
|> set z = x + length(y), zz = x + 1
-- !query analysis
Project [x#x, y#x, z#x, pipeexpression((x#x + 1), false, SET) AS zz#x]
+- Project [x#x, y#x, zz#x, pipeexpression((x#x + length(y#x)), false, SET) AS z#x]
+- Project [x#x, y#x, z#x, pipeexpression(2, false, EXTEND) AS zz#x]
+- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> extend 1 as z
|> set z = x + length(y)
|> set z = z + 1
-- !query analysis
Project [x#x, y#x, pipeexpression((z#x + 1), false, SET) AS z#x]
+- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, SET) AS z#x]
+- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
select col from st
|> extend 1 as z
|> set z = col.i1
-- !query analysis
Project [col#x, pipeexpression(col#x.i1, false, SET) AS z#x]
+- Project [col#x, pipeexpression(1, false, EXTEND) AS z#x]
+- Project [col#x]
+- SubqueryAlias spark_catalog.default.st
+- Relation spark_catalog.default.st[x#x,col#x] parquet


-- !query
table t
|> set y = (select a from other where x = a limit 1)
-- !query analysis
Project [x#x, pipeexpression(scalar-subquery#x [x#x], false, SET) AS y#x]
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Project [a#x]
: +- Filter (outer(x#x) = a#x)
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> extend 1 as z
|> set z = first_value(x) over (partition by y)
-- !query analysis
Project [x#x, y#x, z#x]
+- Project [x#x, y#x, _we0#x, pipeexpression(_we0#x, false, SET) AS z#x]
+- Window [first_value(x#x, false) windowspecdefinition(y#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#x], [y#x]
+- Project [x#x, y#x]
+- Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> set z = 1
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`z`",
"proposal" : "`x`, `y`"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 20,
"fragment" : "table t\n|> set z = 1"
} ]
}


-- !query
table t
|> set x = 1 as z
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "PARSE_SYNTAX_ERROR",
"sqlState" : "42601",
"messageParameters" : {
"error" : "'as'",
"hint" : ""
}
}


-- !query
table t
|> extend 1 as z
|> set z = x + length(y), z = z + 1
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "_LEGACY_ERROR_TEMP_0035",
"messageParameters" : {
"message" : "SQL pipe syntax |> SET operator with duplicate assignment key z"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 60,
"fragment" : "table t\n|> extend 1 as z\n|> set z = x + length(y), z = z + 1"
} ]
}


-- !query
table t
|> where true
Expand Down
58 changes: 58 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,64 @@ table t
table t
|> extend *;

-- SET operators: positive tests.
---------------------------------

-- Setting with a constant.
table t
|> set x = 1;

-- Setting with an attribute.
table t
|> set y = x;

-- Setting with an expression.
table t
|> extend 1 as z
|> set z = x + length(y);

-- Setting two times.
table t
|> extend 1 as z
|> extend 2 as zz
|> set z = x + length(y), zz = x + 1;

-- Setting two times in sequence.
table t
|> extend 1 as z
|> set z = x + length(y)
|> set z = z + 1;

-- Setting with a struct field.
select col from st
|> extend 1 as z
|> set z = col.i1;

-- Setting with a subquery.
table t
|> set y = (select a from other where x = a limit 1);

-- Window functions are allowed in the pipe operator SET list.
table t
|> extend 1 as z
|> set z = first_value(x) over (partition by y);

-- SET operators: negative tests.
---------------------------------

-- SET with a column name that does not exist in the input relation.
table t
|> set z = 1;

-- SET with an alias.
table t
|> set x = 1 as z;

-- SET assignments with duplicate keys.
table t
|> extend 1 as z
|> set z = x + length(y), z = z + 1;

-- WHERE operators: positive tests.
-----------------------------------

Expand Down
Loading

0 comments on commit 6cf23b2

Please sign in to comment.