Skip to content

Commit

Permalink
[SPARK-49751][CONNECT] Fix deserialization of SparkListenerConnectSer…
Browse files Browse the repository at this point in the history
…viceStarted event

### What changes were proposed in this pull request?

`SparkListenerConnectServiceStarted` is introduced in SPARK-47952, while the referenced field `SparkConf` is not serialized properly, then causes the SHS deserialization failure.

According to the discussion, we can remove the `sparkConf` field.

### Why are the changes needed?

Fix the event log deserialization and recover the SHS UI rendering.

### Does this PR introduce _any_ user-facing change?

Fix an unreleased feature, recover the SHS UI rendering from event logs produced by the connect server.

### How was this patch tested?

Start a connect server with event log enabled, and then open the UI in SHS.

4.0.0-preview2
<img width="1624" alt="image" src="https://github.com/user-attachments/assets/adfcb4d1-9ca2-4060-96db-bcad3c63e9b6">

This PR.
<img width="1624" alt="image" src="https://github.com/user-attachments/assets/13a15ba2-b273-4f52-8212-ad829f5a1aee">

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48200 from pan3793/SPARK-49751.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
pan3793 authored and HyukjinKwon committed Oct 4, 2024
1 parent de9b9c8 commit bd3e2eb
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.stub.StreamObserver
import org.apache.commons.lang3.StringUtils

import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, SparkConnectServiceGrpc}
import org.apache.spark.connect.proto.SparkConnectServiceGrpc.AsyncService
Expand Down Expand Up @@ -420,7 +420,7 @@ object SparkConnectService extends Logging {

started = true
stopped = false
postSparkConnectServiceStarted(sc)
postSparkConnectServiceStarted()
}

def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit = synchronized {
Expand Down Expand Up @@ -456,13 +456,9 @@ object SparkConnectService extends Logging {
* Post the event that the Spark Connect service has started. This is expected to be called only
* once after the service is ready.
*/
private def postSparkConnectServiceStarted(sc: SparkContext): Unit = {
private def postSparkConnectServiceStarted(): Unit = {
postServiceEvent(isa =>
SparkListenerConnectServiceStarted(
hostAddress,
isa.getPort,
sc.conf,
System.currentTimeMillis()))
SparkListenerConnectServiceStarted(hostAddress, isa.getPort, System.currentTimeMillis()))
}

/**
Expand Down Expand Up @@ -521,15 +517,12 @@ object SparkConnectService extends Logging {
* The host address of the started Spark Connect service.
* @param bindingPort:
* The binding port of the started Spark Connect service.
* @param sparkConf:
* The SparkConf of the active SparkContext that associated with the service.
* @param eventTime:
* The time in ms when the event was generated.
*/
case class SparkListenerConnectServiceStarted(
hostAddress: String,
bindingPort: Int,
sparkConf: SparkConf,
eventTime: Long)
extends SparkListenerEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,6 @@ class SparkConnectServiceInternalServerSuite extends SparkFunSuite with LocalSpa
// In the meanwhile, no any end event should be posted
assert(listenerInstance.serviceEndEvents.size() == 0)

// The listener is able to get the SparkConf from the event
val event = listenerInstance.serviceStartedEvents.get(0)
assert(event.sparkConf != null)
val sparkConf = event.sparkConf
assert(sparkConf.contains("spark.driver.host"))
assert(sparkConf.contains("spark.app.id"))

// Try to start an already started SparkConnectService
SparkConnectService.start(sc)
// The listener should still receive only one started event
Expand Down

0 comments on commit bd3e2eb

Please sign in to comment.