Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Dec 28, 2017
1 parent fbe266c commit 179c6fd
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -680,9 +680,8 @@ object TypeCoercion {
// Skip nodes if unresolved or empty children
case c @ Concat(children) if !c.childrenResolved || children.isEmpty => c

case c @ Concat(children) if !children.map(_.dataType).forall(_ == BinaryType) =>
typeCastToString(c)
case c @ Concat(children) if conf.concatBinaryAsString =>
case c @ Concat(children) if conf.concatBinaryAsString ||
!children.map(_.dataType).forall(_ == BinaryType) =>
typeCastToString(c)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,16 +646,17 @@ object CombineConcats extends Rule[LogicalPlan] {
stack.pop() match {
case Concat(children) =>
stack.pushAll(children.reverse)
case Cast(Concat(children), StringType, _) =>
stack.pushAll(children.reverse)
// If `spark.sql.function.concatBinaryAsString` is false, nested `Concat` exprs possibly
// have `Concat`s with binary output. Since `TypeCoercion` casts them into strings,
// we need to handle the case to combine all nested `Concat`s.
case c @ Cast(Concat(children), StringType, _) =>
val newChildren = children.map { e => c.copy(child = e) }
stack.pushAll(newChildren.reverse)
case child =>
flattened += child
}
}
val newChildren = flattened.map { e =>
ImplicitTypeCasts.implicitCast(e, StringType).getOrElse(e)
}
Concat(newChildren)
Concat(flattened)
}

def apply(plan: LogicalPlan): LogicalPlan = plan.transformExpressionsDown {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.StringType


class CombineConcatsSuite extends PlanTest {
Expand All @@ -37,8 +36,10 @@ class CombineConcatsSuite extends PlanTest {
comparePlans(actual, correctAnswer)
}

def str(s: String): Literal = Literal(s)
def binary(s: String): Literal = Literal(s.getBytes)

test("combine nested Concat exprs") {
def str(s: String): Literal = Literal(s, StringType)
assertEquivalent(
Concat(
Concat(str("a") :: str("b") :: Nil) ::
Expand Down Expand Up @@ -72,4 +73,13 @@ class CombineConcatsSuite extends PlanTest {
Nil),
Concat(str("a") :: str("b") :: str("c") :: str("d") :: Nil))
}

test("combine string and binary exprs") {
assertEquivalent(
Concat(
Concat(str("a") :: str("b") :: Nil) ::
Concat(binary("c") :: binary("d") :: Nil) ::
Nil),
Concat(str("a") :: str("b") :: binary("c") :: binary("d") :: Nil))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ select right(null, -2), right("abcd", -2), right("abcd", 0), right("abcd", 'a');
set spark.sql.function.concatBinaryAsString=false;

-- Check if catalyst combine nested `Concat`s if concatBinaryAsString=false
EXPLAIN EXTENDED SELECT ((col1 || col2) || (col3 || col4)) col
EXPLAIN SELECT ((col1 || col2) || (col3 || col4)) col
FROM (
SELECT
string(id) col1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ spark.sql.function.concatBinaryAsString false


-- !query 13
EXPLAIN EXTENDED SELECT ((col1 || col2) || (col3 || col4)) col
EXPLAIN SELECT ((col1 || col2) || (col3 || col4)) col
FROM (
SELECT
string(id) col1,
Expand All @@ -141,23 +141,6 @@ FROM (
-- !query 13 schema
struct<plan:string>
-- !query 13 output
== Parsed Logical Plan ==
'Project [concat(concat('col1, 'col2), concat('col3, 'col4)) AS col#x]
+- 'SubqueryAlias __auto_generated_subquery_name
+- 'Project ['string('id) AS col1#x, 'string(('id + 1)) AS col2#x, 'encode('string(('id + 2)), utf-8) AS col3#x, 'encode('string(('id + 3)), utf-8) AS col4#x]
+- 'UnresolvedTableValuedFunction range, [10]

== Analyzed Logical Plan ==
col: string
Project [concat(concat(col1#x, col2#x), cast(concat(col3#x, col4#x) as string)) AS col#x]
+- SubqueryAlias __auto_generated_subquery_name
+- Project [cast(id#xL as string) AS col1#x, cast((id#xL + cast(1 as bigint)) as string) AS col2#x, encode(cast((id#xL + cast(2 as bigint)) as string), utf-8) AS col3#x, encode(cast((id#xL + cast(3 as bigint)) as string), utf-8) AS col4#x]
+- Range (0, 10, step=1, splits=None)

== Optimized Logical Plan ==
Project [concat(cast(id#xL as string), cast((id#xL + 1) as string), cast(encode(cast((id#xL + 2) as string), utf-8) as string), cast(encode(cast((id#xL + 3) as string), utf-8) as string)) AS col#x]
+- Range (0, 10, step=1, splits=None)

== Physical Plan ==
*Project [concat(cast(id#xL as string), cast((id#xL + 1) as string), cast(encode(cast((id#xL + 2) as string), utf-8) as string), cast(encode(cast((id#xL + 3) as string), utf-8) as string)) AS col#x]
+- *Range (0, 10, step=1, splits=2)

0 comments on commit 179c6fd

Please sign in to comment.