Skip to content

Commit

Permalink
[SPARK-47583][CORE] SQL core: Migrate logError with variables to stru…
Browse files Browse the repository at this point in the history
…ctured logging framework

### What changes were proposed in this pull request?

Migrate logError with variables of the sql/core module to structured logging framework. This transforms the logError entries of the following API
```
def logError(msg: => String): Unit
```
to
```
def logError(entry: LogEntry): Unit
```

### Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?

Yes, Spark core logs will contain additional MDC

### How was this patch tested?

Compiler and scala style checks, as well as code review.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45969 from dtenedor/log-error-sql-core.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
dtenedor authored and gengliangwang committed Apr 24, 2024
1 parent 03d4ea6 commit 62dd64a
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object LogKey extends Enumeration {
val ARGS = Value
val BACKUP_FILE = Value
val BATCH_ID = Value
val BATCH_WRITE = Value
val BLOCK_ID = Value
val BLOCK_MANAGER_ID = Value
val BROADCAST_ID = Value
Expand Down Expand Up @@ -116,6 +117,7 @@ object LogKey extends Enumeration {
val ESTIMATOR_PARAMETER_MAP = Value
val EVENT_LOOP = Value
val EVENT_QUEUE = Value
val EXCEPTION = Value
val EXECUTE_INFO = Value
val EXECUTE_KEY = Value
val EXECUTION_PLAN_LEAVES = Value
Expand Down Expand Up @@ -162,6 +164,7 @@ object LogKey extends Enumeration {
val HIVE_OPERATION_TYPE = Value
val HOST = Value
val HOST_PORT = Value
val IDENTIFIER = Value
val INCOMPATIBLE_TYPES = Value
val INDEX = Value
val INDEX_FILE_NUM = Value
Expand Down Expand Up @@ -330,11 +333,13 @@ object LogKey extends Enumeration {
val SPARK_PLAN_ID = Value
val SQL_TEXT = Value
val SRC_PATH = Value
val STAGE_ATTEMPT = Value
val STAGE_ID = Value
val START_INDEX = Value
val STATEMENT_ID = Value
val STATE_STORE_PROVIDER = Value
val STATUS = Value
val STDERR = Value
val STORAGE_LEVEL = Value
val STORAGE_LEVEL_DESERIALIZED = Value
val STORAGE_LEVEL_REPLICATION = Value
Expand Down Expand Up @@ -402,6 +407,7 @@ object LogKey extends Enumeration {
val WEIGHTED_NUM = Value
val WORKER_URL = Value
val WRITE_AHEAD_LOG_INFO = Value
val WRITE_JOB_UUID = Value
val XSD_PATH = Value

type LogKey = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkFiles, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, JsonToStructs, Literal, StructsToJson, UnsafeProjection}
Expand Down Expand Up @@ -185,7 +186,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
if (!proc.isAlive) {
val exitCode = proc.exitValue()
if (exitCode != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
logError(log"${MDC(STDERR, stderrBuffer.toString)}") // log the stderr circular buffer
throw QueryExecutionErrors.subprocessExitedError(exitCode, stderrBuffer, cause)
}
}
Expand Down Expand Up @@ -329,12 +330,13 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging
// Javadoc this call will not throw an exception:
_exception = t
proc.destroy()
logError(s"Thread-${this.getClass.getSimpleName}-Feed exit cause by: ", t)
logError(log"Thread-${MDC(CLASS_NAME, this.getClass.getSimpleName)}-Feed " +
log"exit cause by: ", t)
} finally {
try {
Utils.tryLogNonFatalError(outputStream.close())
if (proc.waitFor() != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
logError(log"${MDC(STDERR, stderrBuffer.toString)}") // log the stderr circular buffer
}
} catch {
case NonFatal(exceptionFromFinallyBlock) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import scala.util.control.NonFatal

import org.apache.spark.SparkException
import org.apache.spark.broadcast
import org.apache.spark.internal.{MDC, MessageWithContext}
import org.apache.spark.internal.LogKey._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -74,14 +76,14 @@ case class AdaptiveSparkPlanExec(

@transient private val lock = new Object()

@transient private val logOnLevel: ( => String) => Unit = conf.adaptiveExecutionLogLevel match {
case "TRACE" => logTrace(_)
case "DEBUG" => logDebug(_)
case "INFO" => logInfo(_)
case "WARN" => logWarning(_)
case "ERROR" => logError(_)
case _ => logDebug(_)
}
@transient private val logOnLevel: ( => MessageWithContext) => Unit =
conf.adaptiveExecutionLogLevel match {
case "TRACE" => logTrace(_)
case "INFO" => logInfo(_)
case "WARN" => logWarning(_)
case "ERROR" => logError(_)
case _ => logDebug(_)
}

@transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()

Expand Down Expand Up @@ -358,8 +360,9 @@ case class AdaptiveSparkPlanExec(
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
logOnLevel("Plan changed:\n" +
sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n"))
lazy val plans =
sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n")
logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
Expand Down Expand Up @@ -389,7 +392,7 @@ case class AdaptiveSparkPlanExec(
if (shouldUpdatePlan && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) {
getExecutionId.foreach(onUpdatePlan(_, Seq.empty))
}
logOnLevel(s"Final plan:\n$currentPhysicalPlan")
logOnLevel(log"Final plan:\n${MDC(QUERY_PLAN, currentPhysicalPlan)}")
}

override def executeCollect(): Array[InternalRow] = {
Expand Down Expand Up @@ -742,7 +745,8 @@ case class AdaptiveSparkPlanExec(
Some((finalPlan, optimized))
} catch {
case e: InvalidAQEPlanException[_] =>
logOnLevel(s"Re-optimize - ${e.getMessage()}:\n${e.plan}")
logOnLevel(log"Re-optimize - ${MDC(EXCEPTION, e.getMessage())}:\n" +
log"${MDC(QUERY_PLAN, e.plan)}")
None
}
}
Expand Down Expand Up @@ -800,7 +804,8 @@ case class AdaptiveSparkPlanExec(
s.cancel()
} catch {
case NonFatal(t) =>
logError(s"Exception in cancelling query stage: ${s.treeString}", t)
logError(log"Exception in cancelling query stage: " +
log"${MDC(QUERY_PLAN, s.treeString)}", t)
}
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.MDC
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
Expand Down Expand Up @@ -70,7 +72,7 @@ case class InsertIntoDataSourceDirCommand(
sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)).toRdd
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to directory " + storage.locationUri.toString, ex)
logError(log"Failed to write to directory ${MDC(URI, storage.locationUri.toString)}", ex)
throw ex
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command

import java.net.URI

import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.MDC
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
Expand Down Expand Up @@ -230,7 +232,8 @@ case class CreateDataSourceTableAsSelectCommand(
dataSource.writeAndRead(mode, query, outputColumnNames)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
logError(log"Failed to write to table " +
log"${MDC(IDENTIFIER, table.identifier.unquotedString)}", ex)
throw ex
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{ACTUAL_PARTITION_COLUMN, EXPECTED_PARTITION_COLUMN, PATH}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -737,10 +737,10 @@ case class RepairTableCommand(
spark.catalog.refreshTable(tableIdentWithDB)
} catch {
case NonFatal(e) =>
logError(s"Cannot refresh the table '$tableIdentWithDB'. A query of the table " +
"might return wrong result if the table was cached. To avoid such issue, you should " +
"uncache the table manually via the UNCACHE TABLE command after table recovering will " +
"complete fully.", e)
logError(log"Cannot refresh the table '${MDC(IDENTIFIER, tableIdentWithDB)}'. " +
log"A query of the table might return wrong result if the table was cached. " +
log"To avoid such issue, you should uncache the table manually via the UNCACHE TABLE " +
log"command after table recovering will complete fully.", e)
}
logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).")
Seq.empty[Row]
Expand Down Expand Up @@ -1030,7 +1030,8 @@ object DDLUtils extends Logging {
DataSource.lookupDataSource(provider, SQLConf.get).getConstructor().newInstance()
} catch {
case e: Throwable =>
logError(s"Failed to find data source: $provider when check data column names.", e)
logError(log"Failed to find data source: ${MDC(DATA_SOURCE_PROVIDER, provider)} " +
log"when check data column names.", e)
return
}
source match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -281,7 +282,7 @@ object FileFormatWriter extends Logging {
// return a set of all the partition paths that were updated during this job
ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty)
} catch { case cause: Throwable =>
logError(s"Aborting job ${description.uuid}.", cause)
logError(log"Aborting job ${MDC(WRITE_JOB_UUID, description.uuid)}.", cause)
committer.abortJob(job)
throw cause
}
Expand Down Expand Up @@ -404,7 +405,7 @@ object FileFormatWriter extends Logging {
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
logError(s"Job $jobId aborted.")
logError(log"Job ${MDC(JOB_ID, jobId)} aborted.")
}, finallyBlock = {
dataWriter.close()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.jdk.CollectionConverters._

import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
Expand Down Expand Up @@ -402,16 +403,17 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
} catch {
case cause: Throwable =>
logError(s"Data source write support $batchWrite is aborting.")
logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} is aborting.")
try {
batchWrite.abort(messages)
} catch {
case t: Throwable =>
logError(s"Data source write support $batchWrite failed to abort.")
logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} " +
log"failed to abort.")
cause.addSuppressed(t)
throw QueryExecutionErrors.writingJobFailedError(cause)
}
logError(s"Data source write support $batchWrite aborted.")
logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} aborted.")
throw cause
}

Expand Down Expand Up @@ -472,11 +474,13 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serial

})(catchBlock = {
// If there is an error, abort this writer
logError(s"Aborting commit for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)")
logError(log"Aborting commit for partition ${MDC(PARTITION_ID, partId)} " +
log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, attemptId)}, " +
log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, stageAttempt)})")
dataWriter.abort()
logError(s"Aborted commit for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)")
logError(log"Aborted commit for partition ${MDC(PARTITION_ID, partId)} " +
log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, attemptId)}, " +
log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, stageAttempt)})")
}, finallyBlock = {
dataWriter.close()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.concurrent.duration.NANOSECONDS
import scala.util.control.NonFatal

import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.MDC
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
Expand Down Expand Up @@ -212,7 +214,7 @@ case class BroadcastExchangeExec(
relationFuture.get(timeout, TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]]
} catch {
case ex: TimeoutException =>
logError(s"Could not execute broadcast in $timeout secs.", ex)
logError(log"Could not execute broadcast in ${MDC(TIMEOUT, timeout)} secs.", ex)
if (!relationFuture.isDone) {
sparkContext.cancelJobsWithTag(jobTag)
relationFuture.cancel(true)
Expand Down

0 comments on commit 62dd64a

Please sign in to comment.