Skip to content

Commit

Permalink
fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Jul 11, 2017
1 parent 596ea17 commit 6b48a9e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ trait CheckAnalysis extends PredicateHelper {
}
}

private def getNumLeafNodes(operator: LogicalPlan): Int = {
operator.collect { case _: LeafNode => 1 }.sum
private def getNumInputFileBlockSources(operator: LogicalPlan): Int = {
operator match {
case _: LeafNode => 1
// UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.
case u: Union => u.children.map(getNumInputFileBlockSources).sum - u.children.length + 1
case o => o.children.map(getNumInputFileBlockSources).sum
}
}

def checkAnalysis(plan: LogicalPlan): Unit = {
Expand Down Expand Up @@ -105,7 +110,7 @@ trait CheckAnalysis extends PredicateHelper {
s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}")

case e @ (_: InputFileName | _: InputFileBlockLength | _: InputFileBlockStart)
if getNumLeafNodes(operator) > 1 =>
if getNumInputFileBlockSources(operator) > 1 =>
e.failAnalysis(s"'${e.prettyName}' does not support more than one sources")

case g: Grouping =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {

test("input_file_name, input_file_block_start, input_file_block_length - more than one sources") {
withTable("tab1", "tab2") {
val data = sparkContext.parallelize(0 to 10).toDF("id")
val data = sparkContext.parallelize(0 to 9).toDF("id")
data.write.saveAsTable("tab1")
data.write.saveAsTable("tab2")
Seq("input_file_name", "input_file_block_start", "input_file_block_length").foreach { func =>
Expand All @@ -541,6 +541,31 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
}.getMessage
assert(e.contains(s"'$func' does not support more than one sources"))
}

val df = sql(
"""
|SELECT *, input_file_name()
|FROM (SELECT * FROM tab1 UNION ALL SELECT * FROM tab2 UNION ALL SELECT * FROM tab2)
""".stripMargin)
assert(df.count() == 30)

var e = intercept[AnalysisException] {
sql(
"""
|SELECT *, input_file_name()
|FROM (SELECT * FROM tab1 NATURAL JOIN tab2) UNION ALL SELECT * FROM tab2
""".stripMargin)
}.getMessage
assert(e.contains("'input_file_name' does not support more than one sources"))

e = intercept[AnalysisException] {
sql(
"""
|SELECT *, input_file_name()
|FROM (SELECT * FROM tab1 UNION ALL SELECT * FROM tab2) NATURAL JOIN tab2
""".stripMargin)
}.getMessage
assert(e.contains("'input_file_name' does not support more than one sources"))
}
}

Expand Down

0 comments on commit 6b48a9e

Please sign in to comment.