From f5360e761ef161f7e04526b59a4baf53f1cf8cd5 Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Sun, 6 Sep 2020 19:23:12 +0800 Subject: [PATCH] [SPARK-32548][SQL] - Add Application attemptId support to SQL Rest API ### What changes were proposed in this pull request? Currently, Spark Public Rest APIs support Application attemptId except SQL API. This causes `no such app: application_X` issue when the application has `attemptId` (e.g: YARN cluster mode). Please find existing and supported Rest endpoints with attemptId. ``` // Existing Rest Endpoints applications/{appId}/sql applications/{appId}/sql/{executionId} // Rest Endpoints required support applications/{appId}/{attemptId}/sql applications/{appId}/{attemptId}/sql/{executionId} ``` Also fixing following compile warning on `SqlResourceSuite`: ``` [WARNING] [Warn] ~/spark/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala:67: Reference to uninitialized value edges ``` ### Why are the changes needed? This causes `no such app: application_X` issue when the application has `attemptId`. ### Does this PR introduce _any_ user-facing change? Not yet because SQL Rest API is being planned to release with `Spark 3.1`. ### How was this patch tested? 1. New Unit tests are added for existing Rest endpoints. `attemptId` seems not coming in `local-mode` and coming in `YARN cluster mode` so could not be added for `attemptId` case (Suggestions are welcome). 2. Also, patch has been tested manually through both Spark Core and History Server Rest APIs. Closes #29364 from erenavsarogullari/SPARK-32548. Authored-by: Eren Avsarogullari Signed-off-by: Gengliang Wang --- .../api/v1/sql/ApiSqlRootResource.scala | 9 +- .../status/api/v1/sql/SqlResourceSuite.scala | 5 +- .../SqlResourceWithActualMetricsSuite.scala | 127 ++++++++++++++++++ 3 files changed, 136 insertions(+), 5 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala index 5fc7123c9097b..747c05b9b0626 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala @@ -17,7 +17,7 @@ package org.apache.spark.status.api.v1.sql -import javax.ws.rs.Path +import javax.ws.rs.{Path, PathParam} import org.apache.spark.status.api.v1.ApiRequestContext @@ -25,5 +25,10 @@ import org.apache.spark.status.api.v1.ApiRequestContext private[v1] class ApiSqlRootResource extends ApiRequestContext { @Path("applications/{appId}/sql") - def sqlList(): Class[SqlResource] = classOf[SqlResource] + def sqlList(@PathParam("appId") appId: String): Class[SqlResource] = classOf[SqlResource] + + @Path("applications/{appId}/{attemptId}/sql") + def sqlList( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource] } diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index 43cca246cc47c..dbc33c47fed51 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -54,12 +54,11 @@ object SqlResourceSuite { SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""), SQLPlanMetric(SIZE_OF_FILES_READ, 5, "")))) + val edges: Seq[SparkPlanGraphEdge] = Seq(SparkPlanGraphEdge(3, 2)) + val nodesWhenCodegenIsOff: Seq[SparkPlanGraphNode] = SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1) - val edges: Seq[SparkPlanGraphEdge] = - Seq(SparkPlanGraphEdge(3, 2)) - val metrics: Seq[SQLPlanMetric] = { Seq(SQLPlanMetric(DURATION, 0, ""), SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""), diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala new file mode 100644 index 0000000000000..0c0e3ac90510e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.sql + +import java.net.URL +import java.text.SimpleDateFormat + +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.HistoryServerSuite.getContentAndCode +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils +import org.apache.spark.sql.test.SharedSparkSession + +case class Person(id: Int, name: String, age: Int) +case class Salary(personId: Int, salary: Double) + +/** + * Sql Resource Public API Unit Tests running query and extracting the metrics. + */ +class SqlResourceWithActualMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { + + import testImplicits._ + + // Exclude nodes which may not have the metrics + val excludedNodes = List("WholeStageCodegen", "Project", "SerializeFromObject") + + implicit val formats = new DefaultFormats { + override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") + } + + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.ui.enabled", "true") + } + + test("Check Sql Rest Api Endpoints") { + // Materalize result DataFrame + val count = getDF().count() + assert(count == 2, s"Expected Query Count is 2 but received: $count") + + // Spark apps launched by local-mode seems not having `attemptId` as default + // so UT is just added for existing endpoints. + val executionId = callSqlRestEndpointAndVerifyResult() + callSqlRestEndpointByExecutionIdAndVerifyResult(executionId) + } + + private def callSqlRestEndpointAndVerifyResult(): Long = { + val url = new URL(spark.sparkContext.ui.get.webUrl + + s"/api/v1/applications/${spark.sparkContext.applicationId}/sql") + val jsonResult = verifyAndGetSqlRestResult(url) + val executionDatas = JsonMethods.parse(jsonResult).extract[Seq[ExecutionData]] + assert(executionDatas.size > 0, + s"Expected Query Result Size is higher than 0 but received: ${executionDatas.size}") + val executionData = executionDatas.head + verifySqlRestContent(executionData) + executionData.id + } + + private def callSqlRestEndpointByExecutionIdAndVerifyResult(executionId: Long): Unit = { + val url = new URL(spark.sparkContext.ui.get.webUrl + + s"/api/v1/applications/${spark.sparkContext.applicationId}/sql/${executionId}") + val jsonResult = verifyAndGetSqlRestResult(url) + val executionData = JsonMethods.parse(jsonResult).extract[ExecutionData] + verifySqlRestContent(executionData) + } + + private def verifySqlRestContent(executionData: ExecutionData): Unit = { + assert(executionData.status == "COMPLETED", + s"Expected status is COMPLETED but actual: ${executionData.status}") + assert(executionData.successJobIds.nonEmpty, + s"Expected successJobIds should not be empty") + assert(executionData.runningJobIds.isEmpty, + s"Expected runningJobIds should be empty but actual: ${executionData.runningJobIds}") + assert(executionData.failedJobIds.isEmpty, + s"Expected failedJobIds should be empty but actual: ${executionData.failedJobIds}") + assert(executionData.nodes.nonEmpty, "Expected nodes should not be empty}") + executionData.nodes.filterNot(node => excludedNodes.contains(node.nodeName)).foreach { node => + assert(node.metrics.nonEmpty, "Expected metrics of nodes should not be empty") + } + } + + private def verifyAndGetSqlRestResult(url: URL): String = { + val (code, resultOpt, error) = getContentAndCode(url) + assert(code == 200, s"Expected Http Response Code is 200 but received: $code for url: $url") + assert(resultOpt.nonEmpty, s"Rest result should not be empty for url: $url") + assert(error.isEmpty, s"Error message should be empty for url: $url") + resultOpt.get + } + + private def getDF(): DataFrame = { + val person: DataFrame = + spark.sparkContext.parallelize( + Person(0, "mike", 30) :: + Person(1, "jim", 20) :: Nil).toDF() + + val salary: DataFrame = + spark.sparkContext.parallelize( + Salary(0, 2000.0) :: + Salary(1, 1000.0) :: Nil).toDF() + + val salaryDF = salary.withColumnRenamed("personId", "id") + val ds = person.join(salaryDF, "id") + .groupBy("name", "age", "salary").avg("age", "salary") + .filter(_.getAs[Int]("age") <= 30) + .sort() + + ds.toDF + } + +}