Skip to content

Commit

Permalink
[SQL] Add an exception for analysis errors.
Browse files Browse the repository at this point in the history
Also start from the bottom so we show the first error instead of the top error.

Author: Michael Armbrust <michael@databricks.com>

Closes #4439 from marmbrus/analysisException and squashes the following commits:

45862a0 [Michael Armbrust] fix hive test
a773bba [Michael Armbrust] Merge remote-tracking branch 'origin/master' into analysisException
f88079f [Michael Armbrust] update more cases
fede90a [Michael Armbrust] newline
fbf4bc3 [Michael Armbrust] move to sql
6235db4 [Michael Armbrust] [SQL] Add an exception for analysis errors.
  • Loading branch information
marmbrus committed Feb 11, 2015
1 parent aaf50d0 commit 6195e24
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

/**
* Thrown when a query fails to analyze, usually because the query itself is invalid.
*/
class AnalysisException(message: String) extends Exception(message) with Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -80,16 +81,18 @@ class Analyzer(catalog: Catalog,
*/
object CheckResolution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
plan.transform {
plan.transformUp {
case p if p.expressions.exists(!_.resolved) =>
throw new TreeNodeException(p,
s"Unresolved attributes: ${p.expressions.filterNot(_.resolved).mkString(",")}")
val missing = p.expressions.filterNot(_.resolved).map(_.prettyString).mkString(",")
val from = p.inputSet.map(_.name).mkString("{", ", ", "}")

throw new AnalysisException(s"Cannot resolve '$missing' given input columns $from")
case p if !p.resolved && p.childrenResolved =>
throw new TreeNodeException(p, "Unresolved plan found")
throw new AnalysisException(s"Unresolved operator in the query plan ${p.simpleString}")
} match {
// As a backstop, use the root node to check that the entire plan tree is resolved.
case p if !p.resolved =>
throw new TreeNodeException(p, "Unresolved plan in tree")
throw new AnalysisException(s"Unresolved operator in the query plan ${p.simpleString}")
case p => p
}
}
Expand Down Expand Up @@ -314,10 +317,11 @@ class Analyzer(catalog: Catalog,
val checkField = (f: StructField) => resolver(f.name, fieldName)
val ordinal = fields.indexWhere(checkField)
if (ordinal == -1) {
sys.error(
throw new AnalysisException(
s"No such struct field $fieldName in ${fields.map(_.name).mkString(", ")}")
} else if (fields.indexWhere(checkField, ordinal + 1) != -1) {
sys.error(s"Ambiguous reference to fields ${fields.filter(checkField).mkString(", ")}")
throw new AnalysisException(
s"Ambiguous reference to fields ${fields.filter(checkField).mkString(", ")}")
} else {
ordinal
}
Expand All @@ -329,7 +333,8 @@ class Analyzer(catalog: Catalog,
case ArrayType(StructType(fields), containsNull) =>
val ordinal = findField(fields)
ArrayGetField(expr, fields(ordinal), ordinal, containsNull)
case otherType => sys.error(s"GetField is not valid on fields of type $otherType")
case otherType =>
throw new AnalysisException(s"GetField is not valid on fields of type $otherType")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis

import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -69,12 +69,12 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))

val e = intercept[TreeNodeException[_]] {
val e = intercept[AnalysisException] {
caseSensitiveAnalyze(
Project(Seq(UnresolvedAttribute("tBl.a")),
UnresolvedRelation(Seq("TaBlE"), Some("TbL"))))
}
assert(e.getMessage().toLowerCase.contains("unresolved"))
assert(e.getMessage().toLowerCase.contains("cannot resolve"))

assert(
caseInsensitiveAnalyze(
Expand Down Expand Up @@ -109,21 +109,21 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
}

test("throw errors for unresolved attributes during analysis") {
val e = intercept[TreeNodeException[_]] {
val e = intercept[AnalysisException] {
caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("abcd")), testRelation))
}
assert(e.getMessage().toLowerCase.contains("unresolved attribute"))
assert(e.getMessage().toLowerCase.contains("cannot resolve"))
}

test("throw errors for unresolved plans during analysis") {
case class UnresolvedTestPlan() extends LeafNode {
override lazy val resolved = false
override def output = Nil
}
val e = intercept[TreeNodeException[_]] {
val e = intercept[AnalysisException] {
caseSensitiveAnalyze(UnresolvedTestPlan())
}
assert(e.getMessage().toLowerCase.contains("unresolved plan"))
assert(e.getMessage().toLowerCase.contains("unresolved"))
}

test("divide should be casted into fractional types") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
("1" :: "2" :: "3" :: "4" :: "A" :: "B" :: "C" :: "D" :: "E" :: "F" :: Nil).map(Row(_)))
// Column type mismatches where a coercion is not possible, in this case between integer
// and array types, trigger a TreeNodeException.
intercept[TreeNodeException[_]] {
intercept[AnalysisException] {
sql("SELECT data FROM arrayData UNION SELECT 1 FROM arrayData").collect()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.test.TestHive.{sparkContext, jsonRDD, sql}
import org.apache.spark.sql.hive.test.TestHive.implicits._

Expand All @@ -40,7 +41,7 @@ class HiveResolutionSuite extends HiveComparisonTest {
"""{"a": [{"b": 1, "B": 2}]}""" :: Nil)).registerTempTable("nested")

// there are 2 filed matching field name "b", we should report Ambiguous reference error
val exception = intercept[RuntimeException] {
val exception = intercept[AnalysisException] {
sql("SELECT a[0].b from nested").queryExecution.analyzed
}
assert(exception.getMessage.contains("Ambiguous reference to fields"))
Expand Down

0 comments on commit 6195e24

Please sign in to comment.