Skip to content

Commit

Permalink
Merge pull request apache#274 from palantir/aash/resync-apache
Browse files Browse the repository at this point in the history
[NOSQUASH] Resync Apache
  • Loading branch information
robert3005 authored Oct 14, 2017
2 parents 46376cb + 5a66611 commit d4c6384
Show file tree
Hide file tree
Showing 28 changed files with 468 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ object TypeCoercion {
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
Some(TimestampType)

case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) =>
Some(StructType(fields1.zip(fields2).map { case (f1, f2) =>
// Since `t1.sameType(t2)` is true, two StructTypes have the same DataType
// except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`.
// - Different names: use f1.name
// - Different nullabilities: `nullable` is true iff one of them is nullable.
val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable)
}))

case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ case class CallMethodViaReflection(children: Seq[Expression])
}
}

override def deterministic: Boolean = false
override lazy val deterministic: Boolean = false
override def nullable: Boolean = true
override val dataType: DataType = StringType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class Expression extends TreeNode[Expression] {
* An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext.
* By default leaf expressions are deterministic as Nil.forall(_.deterministic) returns true.
*/
def deterministic: Boolean = children.forall(_.deterministic)
lazy val deterministic: Boolean = children.forall(_.deterministic)

def nullable: Boolean

Expand Down Expand Up @@ -265,7 +265,7 @@ trait NonSQLExpression extends Expression {
* An expression that is nondeterministic.
*/
trait Nondeterministic extends Expression {
final override def deterministic: Boolean = false
final override lazy val deterministic: Boolean = false
final override def foldable: Boolean = false

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ object ExpressionSet {
}

/**
* A [[Set]] where membership is determined based on a canonical representation of an [[Expression]]
* (i.e. one that attempts to ignore cosmetic differences). See [[Canonicalize]] for more details.
* A [[Set]] where membership is determined based on determinacy and a canonical representation of
* an [[Expression]] (i.e. one that attempts to ignore cosmetic differences).
* See [[Canonicalize]] for more details.
*
* Internally this set uses the canonical representation, but keeps also track of the original
* expressions to ease debugging. Since different expressions can share the same canonical
Expand All @@ -46,14 +47,20 @@ object ExpressionSet {
* set.contains(1 + a) => true
* set.contains(a + 2) => false
* }}}
*
* For non-deterministic expressions, they are always considered as not contained in the [[Set]].
* On adding a non-deterministic expression, simply append it to the original expressions.
* This is consistent with how we define `semanticEquals` between two expressions.
*/
class ExpressionSet protected(
protected val baseSet: mutable.Set[Expression] = new mutable.HashSet,
protected val originals: mutable.Buffer[Expression] = new ArrayBuffer)
extends Set[Expression] {

protected def add(e: Expression): Unit = {
if (!baseSet.contains(e.canonicalized)) {
if (!e.deterministic) {
originals += e
} else if (!baseSet.contains(e.canonicalized) ) {
baseSet.add(e.canonicalized)
originals += e
}
Expand All @@ -74,9 +81,13 @@ class ExpressionSet protected(
}

override def -(elem: Expression): ExpressionSet = {
val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized)
new ExpressionSet(newBaseSet, newOriginals)
if (elem.deterministic) {
val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized)
val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized)
new ExpressionSet(newBaseSet, newOriginals)
} else {
new ExpressionSet(baseSet.clone(), originals.clone())
}
}

override def iterator: Iterator[Expression] = originals.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case class ScalaUDF(
udfDeterministic: Boolean = true)
extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression {

override def deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)
override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)

override def toString: String =
s"${udfName.map(name => s"UDF:$name").getOrElse("UDF")}(${children.mkString(", ")})"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class First(child: Expression, ignoreNullsExpr: Expression)
override def nullable: Boolean = true

// First is not a deterministic function.
override def deterministic: Boolean = false
override lazy val deterministic: Boolean = false

// Return data type.
override def dataType: DataType = child.dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class Last(child: Expression, ignoreNullsExpr: Expression)
override def nullable: Boolean = true

// Last is not a deterministic function.
override def deterministic: Boolean = false
override lazy val deterministic: Boolean = false

// Return data type.
override def dataType: DataType = child.dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper

// Both `CollectList` and `CollectSet` are non-deterministic since their results depend on the
// actual order of input rows.
override def deterministic: Boolean = false
override lazy val deterministic: Boolean = false

override def update(buffer: T, input: InternalRow): T = {
val value = child.eval(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable {
// scalastyle:on line.size.limit
case class Uuid() extends LeafExpression {

override def deterministic: Boolean = false
override lazy val deterministic: Boolean = false

override def nullable: Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
* eagerly executed.
*/
trait Command extends LeafNode {
trait Command extends LogicalPlan {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ object SQLConf {

val PARQUET_OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.parquet.output.committer.class")
.doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. If it is not, then metadata summaries" +
"will never be created, irrespective of the value of parquet.enable.summary-metadata")
.internal()
.stringConf
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -80,7 +81,11 @@ abstract class DataType extends AbstractDataType {
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
*/
private[spark] def sameType(other: DataType): Boolean =
DataType.equalsIgnoreNullability(this, other)
if (SQLConf.get.caseSensitiveAnalysis) {
DataType.equalsIgnoreNullability(this, other)
} else {
DataType.equalsIgnoreCaseAndNullability(this, other)
}

/**
* Returns the same data type but set all nullability fields are true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,17 @@ class TypeCoercionSuite extends AnalysisTest {
widenFunc: (DataType, DataType) => Option[DataType],
t1: DataType,
t2: DataType,
expected: Option[DataType]): Unit = {
expected: Option[DataType],
isSymmetric: Boolean = true): Unit = {
var found = widenFunc(t1, t2)
assert(found == expected,
s"Expected $expected as wider common type for $t1 and $t2, found $found")
// Test both directions to make sure the widening is symmetric.
found = widenFunc(t2, t1)
assert(found == expected,
s"Expected $expected as wider common type for $t2 and $t1, found $found")
if (isSymmetric) {
found = widenFunc(t2, t1)
assert(found == expected,
s"Expected $expected as wider common type for $t2 and $t1, found $found")
}
}

test("implicit type cast - ByteType") {
Expand Down Expand Up @@ -385,6 +388,47 @@ class TypeCoercionSuite extends AnalysisTest {
widenTest(NullType, StructType(Seq()), Some(StructType(Seq())))
widenTest(StringType, MapType(IntegerType, StringType, true), None)
widenTest(ArrayType(IntegerType), StructType(Seq()), None)

widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("b", IntegerType))),
None)
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", DoubleType, nullable = false))),
None)

widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", IntegerType, nullable = false))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = false)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", IntegerType, nullable = true))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = true))),
StructType(Seq(StructField("a", IntegerType, nullable = false))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = true))),
StructType(Seq(StructField("a", IntegerType, nullable = true))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("A", IntegerType))),
None)
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkWidenType(
TypeCoercion.findTightestCommonType,
StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType))),
StructType(Seq(StructField("A", IntegerType), StructField("b", IntegerType))),
Some(StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType)))),
isSymmetric = false)
}
}

test("wider common type for decimal and array") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,14 @@ class ExpressionSetSuite extends SparkFunSuite {
aUpper > bUpper || aUpper <= Rand(1L) || aUpper <= 10,
aUpper <= Rand(1L) || aUpper <= 10 || aUpper > bUpper)

// Partial reorder case: we don't reorder non-deterministic expressions,
// but we can reorder sub-expressions in deterministic AND/OR expressions.
// There are two predicates:
// (aUpper > bUpper || bUpper > 100) => we can reorder sub-expressions in it.
// (aUpper === Rand(1L))
setTest(1,
// Keep all the non-deterministic expressions even they are semantically equal.
setTest(2, Rand(1L), Rand(1L))

setTest(2,
(aUpper > bUpper || bUpper > 100) && aUpper === Rand(1L),
(bUpper > 100 || aUpper > bUpper) && aUpper === Rand(1L))

// There are three predicates:
// (Rand(1L) > aUpper)
// (aUpper <= Rand(1L) && aUpper > bUpper)
// (aUpper > 10 && bUpper > 10) => we can reorder sub-expressions in it.
setTest(1,
setTest(2,
Rand(1L) > aUpper || (aUpper <= Rand(1L) && aUpper > bUpper) || (aUpper > 10 && bUpper > 10),
Rand(1L) > aUpper || (aUpper <= Rand(1L) && aUpper > bUpper) || (bUpper > 10 && aUpper > 10))

Expand Down Expand Up @@ -219,4 +213,39 @@ class ExpressionSetSuite extends SparkFunSuite {
assert((initialSet ++ setToAddWithSameExpression).size == 2)
assert((initialSet ++ setToAddWithOutSameExpression).size == 3)
}

test("add single element to set with non-deterministic expressions") {
val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)

assert((initialSet + (aUpper + 1)).size == 2)
assert((initialSet + Rand(0)).size == 3)
assert((initialSet + (aUpper + 2)).size == 3)
}

test("remove single element to set with non-deterministic expressions") {
val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)

assert((initialSet - (aUpper + 1)).size == 1)
assert((initialSet - Rand(0)).size == 2)
assert((initialSet - (aUpper + 2)).size == 2)
}

test("add multiple elements to set with non-deterministic expressions") {
val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)
val setToAddWithSameDeterministicExpression = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)
val setToAddWithOutSameExpression = ExpressionSet(aUpper + 3 :: aUpper + 4 :: Nil)

assert((initialSet ++ setToAddWithSameDeterministicExpression).size == 3)
assert((initialSet ++ setToAddWithOutSameExpression).size == 4)
}

test("remove multiple elements to set with non-deterministic expressions") {
val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)
val setToRemoveWithSameDeterministicExpression = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil)
val setToRemoveWithOutSameExpression = ExpressionSet(aUpper + 3 :: aUpper + 4 :: Nil)

assert((initialSet -- setToRemoveWithSameDeterministicExpression).size == 1)
assert((initialSet -- setToRemoveWithOutSameExpression).size == 2)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("combine redundant deterministic filters") {
val originalQuery =
testRelation
.where(Rand(0) > 0.1 && 'a === 1)
.where(Rand(0) > 0.1 && 'a === 1)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.where(Rand(0) > 0.1 && 'a === 1 && Rand(0) > 0.1)
.analyze

comparePlans(optimized, correctAnswer)
}

test("SPARK-16164: Filter pushdown should keep the ordering in the logical plan") {
val originalQuery =
testRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ case class SimpleTypedAggregateExpression(
nullable: Boolean)
extends DeclarativeAggregate with TypedAggregateExpression with NonSQLExpression {

override def deterministic: Boolean = true
override lazy val deterministic: Boolean = true

override def children: Seq[Expression] = inputDeserializer.toSeq :+ bufferDeserializer

Expand Down Expand Up @@ -221,7 +221,7 @@ case class ComplexTypedAggregateExpression(
inputAggBufferOffset: Int = 0)
extends TypedImperativeAggregate[Any] with TypedAggregateExpression with NonSQLExpression {

override def deterministic: Boolean = true
override lazy val deterministic: Boolean = true

override def children: Seq[Expression] = inputDeserializer.toSeq

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ case class ScalaUDAF(

override def dataType: DataType = udaf.dataType

override def deterministic: Boolean = udaf.deterministic
override lazy val deterministic: Boolean = udaf.deterministic

override val inputTypes: Seq[DataType] = udaf.inputSchema.map(_.dataType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ object FileFormatWriter extends Logging {
val rdd = if (orderingMatched) {
queryExecution.toRdd
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = requiredOrdering
.map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns))
SortExec(
requiredOrdering.map(SortOrder(_, Ascending)),
orderingExpr,
global = false,
child = queryExecution.executedPlan).execute()
}
Expand Down
Loading

0 comments on commit d4c6384

Please sign in to comment.