Skip to content

Commit

Permalink
fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Jul 9, 2017
1 parent a0fe32a commit 04ff406
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ trait CheckAnalysis extends PredicateHelper {
}
}

private def getNumLeafNodes(operator: LogicalPlan): Int = {
operator.collect { case _: LeafNode => 1 }.sum
}

def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
Expand All @@ -100,6 +104,10 @@ trait CheckAnalysis extends PredicateHelper {
failAnalysis(
s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}")

case e @ (_: InputFileName | _: InputFileBlockLength | _: InputFileBlockStart)
if getNumLeafNodes(operator) > 1 =>
e.failAnalysis(s"'${e.prettyName}' does not support more than one sources")

case g: Grouping =>
failAnalysis("grouping() can only be used with GroupingSets/Cube/Rollup")
case g: GroupingID =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,27 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
)
}

test("input_file_name, input_file_block_start, input_file_block_length - more than one sources") {
withTable("tab1", "tab2") {
val data = sparkContext.parallelize(0 to 10).toDF("id")
data.write.saveAsTable("tab1")
data.write.saveAsTable("tab2")
Seq("input_file_name", "input_file_block_start", "input_file_block_length").foreach { func =>
val e = intercept[AnalysisException] {
sql(s"SELECT *, $func() FROM tab1 JOIN tab2 ON tab1.id = tab2.id")
}.getMessage
assert(e.contains(s"'$func' does not support more than one sources"))
}
}
}

test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") {
withTempPath { dir =>
val data = sparkContext.parallelize(0 to 10).toDF("id")
data.write.parquet(dir.getCanonicalPath)

spark.read.parquet(dir.getCanonicalPath).explain(true)

// Test the 3 expressions when reading from files
val q = spark.read.parquet(dir.getCanonicalPath).select(
input_file_name(), expr("input_file_block_start()"), expr("input_file_block_length()"))
Expand Down

0 comments on commit 04ff406

Please sign in to comment.