Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30313][CORE] Ensure EndpointRef is available MasterWebUI/WorkerPage #27010

Closed
wants to merge 9 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Dec 26, 2019

What changes were proposed in this pull request?

This patch fixes flaky tests "master/worker web ui available" & "master/worker web ui available with reverseProxy" in MasterSuite.

Tracking back from stack trace below,

19/12/19 13:48:39.160 dispatcher-event-loop-4 INFO Worker: WorkerWebUI is available at http://localhost:8080/proxy/worker-20191219
134839-localhost-36054
19/12/19 13:48:39.296 WorkerUI-52072 WARN JettyUtils: GET /json/ failed: java.lang.NullPointerException
java.lang.NullPointerException
        at org.apache.spark.deploy.worker.ui.WorkerPage.renderJson(WorkerPage.scala:39)
        at org.apache.spark.ui.WebUI.$anonfun$attachPage$2(WebUI.scala:91)
        at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:80)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:873)
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)
        at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95)
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)

there's possible race condition in Dispatcher.registerRpcEndpoint():

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, getMessageLoop(name, endpoint)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
}
endpointRefs.put(endpoint, endpointRef)
endpointRef
}

getMessageLoop() initializes a new Inbox for this endpoint for both DedicatedMessageLoop
and SharedMessageLoop, which calls onStart() "asynchronously" and "eventually" via posting OnStart message. onStart() will initialize UI page instance(s), so the execution of endpointRefs.put() and initializing UI page instance(s) are "concurrent".

MasterPage and WorkerPage retrieve endpoint ref and store it as "val" assuming endpoint ref is valid when they're initialized - so in bad case they could store "null" as endpoint ref, and don't change.

private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private val master = parent.masterEndpointRef
def getMasterState: MasterStateResponse = {
master.askSync[MasterStateResponse](RequestMasterState)
}

private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
private val workerEndpoint = parent.worker.self
override def renderJson(request: HttpServletRequest): JValue = {
val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
JsonProtocol.writeWorkerState(workerState)
}

This patch breaks down the step to find the right message loop and register endpoint to message loop, and ensure endpoint ref is set "before" registering endpoint to message loop.

Why are the changes needed?

We observed the test failures from Jenkins; below are the links:

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/115583/testReport/
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/115700/testReport/

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

You can also reproduce the bug consistently via adding Thread.sleep(1000) just before endpointRefs.put(endpoint, endpointRef) in Dispatcher.registerRpcEndpoint(...).

@@ -34,7 +34,12 @@ class MasterWebUI(
extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
requestedPort, master.conf, name = "MasterUI") with Logging {

val masterEndpointRef = master.self
val masterEndpointRef = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't feel comfortable adding infinite loop, we can just change it from val to def, with adding comment it shouldn't be cached.

@@ -33,7 +33,12 @@ import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils

private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
private val workerEndpoint = parent.worker.self
private val workerEndpoint = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: if we don't feel comfortable adding infinite loop, we can just change it from val to def, with adding comment it shouldn't be cached.

@HeartSaVioR
Copy link
Contributor Author

Hmm... Does it mean the comment in below is broken? The code comment says self will become valid when onStart is called, but that doesn't seem to be true - self will become valid "around" when onStart is called and there's no guarantee that self is valid in onStart.

/**
* The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
* called. And `self` will become `null` when `onStop` is called.
*
* Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
* valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
*/
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}

@SparkQA
Copy link

SparkQA commented Dec 26, 2019

Test build #115790 has finished for PR 27010 at commit 223c466.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding Dispatcher.this.synchronized protection for endpointRefs in both registerRpcEndpoint() and getRpcEndpointRef()?

Actually, endpointRefs.put(endpoint, endpointRef) used to under the Dispatcher.this.synchronized protection` before #26059.

@HeartSaVioR
Copy link
Contributor Author

Thanks for referring #26059. I took a look a bit, and found actual change relevant to this.

Below is the implementation of registerRpcEndpoint before #26059:

  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      val data = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

According to the code comment, the code ensures onStart will be called "after" endpointRef is set to endpointRefs. Currently all the operations are done in getMessageLoop so we can't ensure it.

Maybe SharedMessageLoop and DedicatedMessageLoop shouldn't set Inbox be active by itself, and let Dispatcher initiates it.

@HeartSaVioR
Copy link
Contributor Author

Btw, I found the bug can be reproduced consistently, via adding Thread.sleep(1000) just before endpointRefs.put(endpoint, endpointRef) in Dispatcher.registerRpcEndpoint(...). Also updated the same in the description of PR.

@SparkQA
Copy link

SparkQA commented Dec 27, 2019

Test build #115823 has finished for PR 27010 at commit 7bcdf29.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

I just changed the approach; please take a look. The idea is that endpoint should be only accessed by endpoint ref after the call of registerRpcEndpoint; only exception is referring ref in onStart. So it's safe to put endpoint ref earlier than assigning to message loop, and remove when assign to message loop fails.

@SparkQA
Copy link

SparkQA commented Dec 27, 2019

Test build #115824 has finished for PR 27010 at commit fff5050.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
// active when registering, and endpointRef must be put into endpointRefs before onStart is
// called. Refer the doc of `RpcEndpoint.self`, as well as `NettyRpcEnv.endpointRef`.
endpointRefs.put(endpoint, endpointRef)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can update endpointRefs here, could we also update endpointRefs above getMessageLoop/assignToMessageLoop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't get it. Could you elaborate? If you meant creating endpointRef here, that would be simple to do but either we need to have tuple of return type or registerRpcEndpoint should get endpointRef from endpointRefs, looks like no big advantage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, never mind. I got why you do this(endpointRefs.put(endpoint, endpointRef)) in assignToMessageLoop().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's for doing only when putIfAbsent runs code for "absent".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you read my mind.

sharedLoop
}
} catch {
case NonFatal(e) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will we fail?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Dec 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be various reasons as we do non-trivial operations here; but yes I haven't met and imagine any real case. That's defensive code, but this ensures the behavior is same as before when failing. (ref will not be registered in refs.)

@HeartSaVioR
Copy link
Contributor Author

cc. @vanzin @zsxwing

@@ -68,11 +82,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, getMessageLoop(name, endpoint)) != null) {
if (endpoints.putIfAbsent(name, assignToMessageLoop(name, endpoint, endpointRef)) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this solves the issue, I don't think it's quite right. The error path here is wrong, because you'll modify endpointRefs and, more importantly, the message loop. (assignToMessageLoop mutates those, and is called here regardless of whether the endpoint should be registered.)

To be fair, the previous code also has that problem w.r.t. the message loop being modified.

I think it would be safe here to have something like:

def findMessageLoop(endpoint) = {
  // return the right message loop without modification
}

val messageLoop = findMessageLoop(endpoint)
if (endpoints.putIfAbsent(name, messageLoop) != null) {
  throw
}
endpointRefs.put(...)
messageLoop.register(...)

If done inside the synchronized loop that seems to be safe and solve the problem. DedicatedMessageLoop should also implement register and call setActive there, instead of as part of the constructor. To add another small thing, DedicatedMessageLoop will leak a thread pool here in the error case, so maybe the thread pool should also be created in the register implementation.

In fact... since this is inside a synchronized block anyway, you can simplify some of the above by not using putIfAbsent. Just check with containsKey, throw if it already exists, then find the right loop, put it in endpoints and update endpointRefs, then call register(). You'll still need DedicatedMessageLoop.register() to call setActive() at the right time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, the previous code also has that problem w.r.t. the message loop being modified.

Yes, that's the reason I just put the band-aid there and rename the method as well. I tried to provide smallest change as the goal of patch is to just fix the thread-safety issue.

But basically I totally agree about your suggestion, especially having register to DedicatedMessageLoop explicitly and not registering endpoint in findMessageLoop. Will reflect. Thanks for the suggestion.

@SparkQA
Copy link

SparkQA commented Dec 31, 2019

Test build #115976 has finished for PR 27010 at commit ac10f87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val msgLoop = findMessageLoop(name, endpoint)
endpoints.put(name, msgLoop)
try {
endpointRefs.put(endpoint, endpointRef)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. This is all correct but feels a bit overkill. Seems like a simpler version would be:

endpointRefs.put(endpoint, endpointRef)
try {
  endpoints.put(name, getMessageLoop(name, endpoint))
} catch {
  // cleanup endpointRefs
}

Yes, that uses the old getMessageLoop() (which could be inlined here for clarity), but that's ok as long as it's done after the containsKey check. Then you don't even need the changes to the other file.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jan 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, that's pretty much close with the patch before ac10f87 (that commit was to reflect review comment), with the additional changes; to use containsKey/put instead of putIfAbsent, and inline getMessageLoop (assignToMessageLoop before ac10f87 but name doesn't matter as we will inline). Could you confirm?

Copy link
Contributor

@vanzin vanzin Jan 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I can't find a way to see the complete patch at a specific commit in the UI, so I'll say "maybe".

The goal is:

  • not modify "endpoints" when checking if the endpoint exists
  • update "endpointRefs" before registering the endpoint's message loop (calling register in the case of the shared loop, or creating the dedicated message loop)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming! Will make a change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

@SparkQA
Copy link

SparkQA commented Jan 3, 2020

Test build #116059 has finished for PR 27010 at commit 8144b7d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jan 3, 2020

Test build #116090 has finished for PR 27010 at commit 8144b7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
if (messageLoop != null && messageLoop.isInstanceOf[DedicatedMessageLoop]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will never happen, because if an exception is thrown, it will be in the DedicatedMessageLoop constructor, so messageLoop will still be null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I'll remove it.

Btw, do we decide to just ignore leaking thread pool? The previous change I reverted was required to deal with it, as thread pool shouldn't be initialized in constructor. I guess it might be yes, as you've mentioned it as "small thing", but that's only the matter of git rebase so please let me know if we would want to address it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anything fails in that constructor, it will be the creation of the thread pool itself, so I don't think anything would leak. Also if that fails, we have bigger problems anyway.

@SparkQA
Copy link

SparkQA commented Jan 4, 2020

Test build #116106 has finished for PR 27010 at commit 68fcd51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Looks like the related tests fail more frequently, seen two times in a PR (not sure why it seems to fail more frequently). Kindly reminder to handle this sooner.

@vanzin
Copy link
Contributor

vanzin commented Jan 6, 2020

Merging to master.

@vanzin vanzin closed this in 895e572 Jan 6, 2020
@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-30313 branch January 6, 2020 23:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants