Skip to content

Commit

Permalink
[CARMEL-3569][SPARK-30842] Adjust abstraction structure for join oper…
Browse files Browse the repository at this point in the history
…ators (#44)
  • Loading branch information
Luan, Xuedong authored and allenma committed Aug 19, 2020
1 parent 174366c commit 2db2ee9
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.joins

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils}

/**
* Holds common logic for join operators
*/
trait BaseJoinExec extends BinaryExecNode {
def joinType: JoinType
def condition: Option[Expression]
def leftKeys: Seq[Expression]
def rightKeys: Seq[Expression]

override def simpleStringWithNodeId(): String = {
val opId = ExplainUtils.getOpId(this)
s"$nodeName $joinType ($opId)".trim
}

override def verboseStringWithOperatorId(): String = {
val joinCondStr = if (condition.isDefined) {
s"${condition.get}"
} else "None"
if (leftKeys.nonEmpty || rightKeys.nonEmpty) {
s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Left keys", leftKeys)}
|${ExplainUtils.generateFieldString("Right keys", rightKeys)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
|""".stripMargin
} else {
s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
|""".stripMargin
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{BooleanType, LongType}

Expand All @@ -44,7 +44,7 @@ case class BroadcastHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin with CodegenSupport {
extends HashJoin with CodegenSupport {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.collection.{BitSet, CompactBuffer}

Expand All @@ -32,7 +32,10 @@ case class BroadcastNestedLoopJoinExec(
right: SparkPlan,
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression]) extends BinaryExecNode {
condition: Option[Expression]) extends BaseJoinExec {

override def leftKeys: Seq[Expression] = Nil
override def rightKeys: Seq[Expression] = Nil

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand All @@ -43,6 +46,11 @@ case class BroadcastNestedLoopJoinExec(
case BuildLeft => (right, left)
}

override def simpleStringWithNodeId(): String = {
val opId = ExplainUtils.getOpId(this)
s"$nodeName $joinType ${buildSide} ($opId)".trim
}

override def requiredChildDistribution: Seq[Distribution] = buildSide match {
case BuildLeft =>
BroadcastDistribution(IdentityBroadcastMode) :: UnspecifiedDistribution :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, Predicate, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils, ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.CompletionIterator

Expand Down Expand Up @@ -60,23 +61,16 @@ class UnsafeCartesianRDD(
case class CartesianProductExec(
left: SparkPlan,
right: SparkPlan,
condition: Option[Expression]) extends BinaryExecNode {
condition: Option[Expression]) extends BaseJoinExec {

override def joinType: JoinType = Inner
override def leftKeys: Seq[Expression] = Nil
override def rightKeys: Seq[Expression] = Nil
override def output: Seq[Attribute] = left.output ++ right.output

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override def verboseStringWithOperatorId(): String = {
val joinCondStr = if (condition.isDefined) {
s"${condition.get}"
} else "None"

s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
""".stripMargin
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,18 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ExplainUtils, RowIterator, SparkPlan}
import org.apache.spark.sql.execution.{ExplainUtils, RowIterator}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{IntegralType, LongType}

trait HashJoin {
self: SparkPlan =>

val leftKeys: Seq[Expression]
val rightKeys: Seq[Expression]
val joinType: JoinType
val buildSide: BuildSide
val condition: Option[Expression]
val left: SparkPlan
val right: SparkPlan
trait HashJoin extends BaseJoinExec {
def buildSide: BuildSide

override def simpleStringWithNodeId(): String = {
val opId = ExplainUtils.getOpId(this)
s"$nodeName $joinType ${buildSide} ($opId)".trim
}

override def verboseStringWithOperatorId(): String = {
val joinCondStr = if (condition.isDefined) {
s"${condition.get}"
} else "None"

s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Left keys", leftKeys)}
|${ExplainUtils.generateFieldString("Right keys", rightKeys)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
|""".stripMargin
}

override def output: Seq[Attribute] = {
joinType match {
case _: InnerLike =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
Expand All @@ -39,7 +39,7 @@ case class ShuffledHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin {
extends HashJoin {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class SortMergeJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean = false) extends BinaryExecNode with CodegenSupport {
isSkewJoin: Boolean = false) extends BaseJoinExec with CodegenSupport {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand All @@ -52,23 +52,6 @@ case class SortMergeJoinExec(

override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator

override def simpleStringWithNodeId(): String = {
val opId = ExplainUtils.getOpId(this)
s"$nodeName $joinType ($opId)".trim
}

override def verboseStringWithOperatorId(): String = {
val joinCondStr = if (condition.isDefined) {
s"${condition.get}"
} else "None"
s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Left keys", leftKeys)}
|${ExplainUtils.generateFieldString("Right keys", rightKeys)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
|""".stripMargin
}

override def output: Seq[Attribute] = {
joinType match {
case _: InnerLike =>
Expand Down

0 comments on commit 2db2ee9

Please sign in to comment.