Skip to content

Commit

Permalink
KE-27048, query scan bytes is incorrect.
Browse files Browse the repository at this point in the history
  • Loading branch information
leozhiyu authored and chenzhx committed Oct 22, 2021
1 parent 0f22c70 commit e03cd53
Show file tree
Hide file tree
Showing 40 changed files with 63 additions and 37 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ class BarrierTaskContext private[spark] (
this
}

override def addTaskCompletionListenerToHead(listener: TaskCompletionListener): this.type = {
taskContext.addTaskCompletionListenerToHead(listener)
this
}

override def addTaskFailureListener(listener: TaskFailureListener): this.type = {
taskContext.addTaskFailureListener(listener)
this
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ abstract class TaskContext extends Serializable {
})
}

def addTaskCompletionListenerToHead(listener: TaskCompletionListener): TaskContext

def addTaskCompletionListenerToHead[U](f: (TaskContext) => U): TaskContext = {
addTaskCompletionListenerToHead(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
})
}

/**
* Adds a listener to be executed on task failure. Adding a listener to an already failed task
* will result in that listener being called immediately.
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ private[spark] class TaskContextImpl(
this
}

@GuardedBy("this")
override def addTaskCompletionListenerToHead(listener: TaskCompletionListener)
: this.type = synchronized {
if (completed) {
listener.onTaskCompletion(this)
} else {
onCompleteCallbacks.insert(0, listener)
}
this
}

@GuardedBy("this")
override def addTaskFailureListener(listener: TaskFailureListener)
: this.type = synchronized {
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-token-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/spark-ganglia-lgpl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion hadoop-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion launcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion mllib-local/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
<url>http://spark.apache.org/</url>
Expand Down
2 changes: 1 addition & 1 deletion repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/mesos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,13 @@ case class HiveTableScanExec(
// Avoid to serialize MetastoreRelation because schema is lazy. (see SPARK-15649)
val outputSchema = schema
rdd.mapPartitionsWithIndexInternal { (index, iter) =>
TaskContext.get().addTaskCompletionListener(new TaskCompletionListener {
TaskContext.get().addTaskCompletionListenerToHead(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
longMetric("readBytes").add(context.taskMetrics().inputMetrics.bytesRead)
}
})


val proj = UnsafeProjection.create(outputSchema)
proj.initialize(index)
iter.map { r =>
Expand Down
2 changes: 1 addition & 1 deletion streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.1.1-kylin-4.x-r25</version>
<version>3.1.1-kylin-4.x-r26</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down

0 comments on commit e03cd53

Please sign in to comment.