Skip to content

Commit

Permalink
[SPARK-32548][SQL] - Add Application attemptId support to SQL Rest API
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Currently, Spark Public Rest APIs support Application attemptId except SQL API. This causes `no such app: application_X` issue when the application has `attemptId` (e.g: YARN cluster mode).

Please find existing and supported Rest endpoints with attemptId.
```
// Existing Rest Endpoints
applications/{appId}/sql
applications/{appId}/sql/{executionId}

// Rest Endpoints required support
applications/{appId}/{attemptId}/sql
applications/{appId}/{attemptId}/sql/{executionId}
```
Also fixing following compile warning on `SqlResourceSuite`:
```
[WARNING] [Warn] ~/spark/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala:67: Reference to uninitialized value edges
```
### Why are the changes needed?
This causes `no such app: application_X` issue when the application has `attemptId`.

### Does this PR introduce _any_ user-facing change?
Not yet because SQL Rest API is being planned to release with `Spark 3.1`.

### How was this patch tested?
1. New Unit tests are added for existing Rest endpoints. `attemptId` seems not coming in `local-mode` and coming in `YARN cluster mode` so could not be added for `attemptId` case (Suggestions are welcome).
2. Also, patch has been tested manually through both Spark Core and History Server Rest APIs.

Closes #29364 from erenavsarogullari/SPARK-32548.

Authored-by: Eren Avsarogullari <erenavsarogullari@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
  • Loading branch information
erenavsarogullari authored and gengliangwang committed Sep 6, 2020
1 parent f556946 commit f5360e7
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package org.apache.spark.status.api.v1.sql

import javax.ws.rs.Path
import javax.ws.rs.{Path, PathParam}

import org.apache.spark.status.api.v1.ApiRequestContext

@Path("/v1")
private[v1] class ApiSqlRootResource extends ApiRequestContext {

@Path("applications/{appId}/sql")
def sqlList(): Class[SqlResource] = classOf[SqlResource]
def sqlList(@PathParam("appId") appId: String): Class[SqlResource] = classOf[SqlResource]

@Path("applications/{appId}/{attemptId}/sql")
def sqlList(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource]
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,11 @@ object SqlResourceSuite {
SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""),
SQLPlanMetric(SIZE_OF_FILES_READ, 5, ""))))

val edges: Seq[SparkPlanGraphEdge] = Seq(SparkPlanGraphEdge(3, 2))

val nodesWhenCodegenIsOff: Seq[SparkPlanGraphNode] =
SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1)

val edges: Seq[SparkPlanGraphEdge] =
Seq(SparkPlanGraphEdge(3, 2))

val metrics: Seq[SQLPlanMetric] = {
Seq(SQLPlanMetric(DURATION, 0, ""),
SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.api.v1.sql

import java.net.URL
import java.text.SimpleDateFormat

import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods

import org.apache.spark.SparkConf
import org.apache.spark.deploy.history.HistoryServerSuite.getContentAndCode
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
import org.apache.spark.sql.test.SharedSparkSession

case class Person(id: Int, name: String, age: Int)
case class Salary(personId: Int, salary: Double)

/**
* Sql Resource Public API Unit Tests running query and extracting the metrics.
*/
class SqlResourceWithActualMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {

import testImplicits._

// Exclude nodes which may not have the metrics
val excludedNodes = List("WholeStageCodegen", "Project", "SerializeFromObject")

implicit val formats = new DefaultFormats {
override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
}

override def sparkConf: SparkConf = {
super.sparkConf.set("spark.ui.enabled", "true")
}

test("Check Sql Rest Api Endpoints") {
// Materalize result DataFrame
val count = getDF().count()
assert(count == 2, s"Expected Query Count is 2 but received: $count")

// Spark apps launched by local-mode seems not having `attemptId` as default
// so UT is just added for existing endpoints.
val executionId = callSqlRestEndpointAndVerifyResult()
callSqlRestEndpointByExecutionIdAndVerifyResult(executionId)
}

private def callSqlRestEndpointAndVerifyResult(): Long = {
val url = new URL(spark.sparkContext.ui.get.webUrl
+ s"/api/v1/applications/${spark.sparkContext.applicationId}/sql")
val jsonResult = verifyAndGetSqlRestResult(url)
val executionDatas = JsonMethods.parse(jsonResult).extract[Seq[ExecutionData]]
assert(executionDatas.size > 0,
s"Expected Query Result Size is higher than 0 but received: ${executionDatas.size}")
val executionData = executionDatas.head
verifySqlRestContent(executionData)
executionData.id
}

private def callSqlRestEndpointByExecutionIdAndVerifyResult(executionId: Long): Unit = {
val url = new URL(spark.sparkContext.ui.get.webUrl
+ s"/api/v1/applications/${spark.sparkContext.applicationId}/sql/${executionId}")
val jsonResult = verifyAndGetSqlRestResult(url)
val executionData = JsonMethods.parse(jsonResult).extract[ExecutionData]
verifySqlRestContent(executionData)
}

private def verifySqlRestContent(executionData: ExecutionData): Unit = {
assert(executionData.status == "COMPLETED",
s"Expected status is COMPLETED but actual: ${executionData.status}")
assert(executionData.successJobIds.nonEmpty,
s"Expected successJobIds should not be empty")
assert(executionData.runningJobIds.isEmpty,
s"Expected runningJobIds should be empty but actual: ${executionData.runningJobIds}")
assert(executionData.failedJobIds.isEmpty,
s"Expected failedJobIds should be empty but actual: ${executionData.failedJobIds}")
assert(executionData.nodes.nonEmpty, "Expected nodes should not be empty}")
executionData.nodes.filterNot(node => excludedNodes.contains(node.nodeName)).foreach { node =>
assert(node.metrics.nonEmpty, "Expected metrics of nodes should not be empty")
}
}

private def verifyAndGetSqlRestResult(url: URL): String = {
val (code, resultOpt, error) = getContentAndCode(url)
assert(code == 200, s"Expected Http Response Code is 200 but received: $code for url: $url")
assert(resultOpt.nonEmpty, s"Rest result should not be empty for url: $url")
assert(error.isEmpty, s"Error message should be empty for url: $url")
resultOpt.get
}

private def getDF(): DataFrame = {
val person: DataFrame =
spark.sparkContext.parallelize(
Person(0, "mike", 30) ::
Person(1, "jim", 20) :: Nil).toDF()

val salary: DataFrame =
spark.sparkContext.parallelize(
Salary(0, 2000.0) ::
Salary(1, 1000.0) :: Nil).toDF()

val salaryDF = salary.withColumnRenamed("personId", "id")
val ds = person.join(salaryDF, "id")
.groupBy("name", "age", "salary").avg("age", "salary")
.filter(_.getAs[Int]("age") <= 30)
.sort()

ds.toDF
}

}

0 comments on commit f5360e7

Please sign in to comment.