-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21737][YARN]Create communication channel between arbitrary clients and the Spark AM in YARN mode #18978
Conversation
…oAMRpc Merged with master before making a pr
ok to test |
Test build #80801 has finished for PR 18978 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made one pass through, ran out of time so posting what I have now.
We need to update the security.md doc for this as well.
this(appId, "",secret, conf); | ||
} | ||
|
||
AuthEngine(String appId, String user, String secret, TransportConf conf) throws GeneralSecurityException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't a public class, its package level visibility so I don't think we need to add another constructor here.
if (conf != null && conf.isConnectionUsingTokens()) { | ||
ClientToAMTokenSecretManager secretManager = new ClientToAMTokenSecretManager(null, | ||
decodeMasterKey(secretKeyHolder.getSecretKey(secretKeyId))); | ||
ClientToAMTokenIdentifier identifier = getIdentifier(nc.getDefaultName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to keep from having to import any of the hadoop security classes directly here. this is also duplicate code from here and AuthEngine. So perhaps we can create a function in the HadoopSecurityUtil that does all of this. It just needs to take the secretkey and the nc.getDefaultName and return an object that has the username and the password.
SecretKeySpec authKey; | ||
if (conf.isConnectionUsingTokens()) { | ||
// Create a secret from client's token identifier and AM's master key. | ||
ClientToAMTokenSecretManager secretManager = new ClientToAMTokenSecretManager(null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment in SparkSaslServer about making this common
|
||
/** Creates an ClientToAMTokenIdentifier from the encoded Base-64 String */ | ||
public static ClientToAMTokenIdentifier getIdentifier(String id) throws InvalidToken { | ||
byte[] tokenId = byteBufToByte(Base64.decode( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't you just use ByteBuf.array() to convert from ByteBuf to byte[]?
val clientToAMToken = report.getClientToAMToken | ||
val token = ConverterUtils.convertFromYarn(clientToAMToken, serviceAddr) | ||
|
||
// Fetch Identifier, secretkey from the report, encode it and Set it in the Security Manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would change this comment to just say why we encode it here.. encode the token identifier and secret because they can contain binary data.
import org.apache.spark.network.{BlockDataManager, TransportContext} | ||
import org.apache.spark.network.client.TransportClientBootstrap | ||
import org.apache.spark.network.netty.{NettyBlockRpcServer, SparkTransportConf} | ||
import org.apache.spark.network.sasl.{SaslClientBootstrap, SaslServerBootstrap} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need all of these imports, clean them up and remove any unused
|
||
sparkConf.set("spark.rpc.connectionUsingTokens", "true") | ||
val rpcEnv = | ||
RpcEnv.create("yarnDriverClient", Utils.localHostName(), 0, sparkConf, securityManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want the clientMode parameter here set to true.
@@ -23,6 +23,8 @@ import scala.concurrent.{ExecutionContext, Future} | |||
import scala.util.{Failure, Success} | |||
import scala.util.control.NonFatal | |||
|
|||
import org.apache.hadoop.io.DataOutputBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove these changes since not needed
private[spark] class ClientToAMEndpoint( | ||
override val rpcEnv: RpcEnv, driverRef: RpcEndpointRef, securityManager: SecurityManager) | ||
extends RpcEndpoint with Logging { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment empty for now on purpose until we implement command that uses it
|
||
private [spark] object ApplicationMasterMessages { | ||
|
||
case class HelloWorld() extends ApplicationMasterMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove, assume used for testing
So, one thing I'm a little confused about is: why are there any changes necessary to the transport library at all? The transport library abstracts away the concept of users / secrets so that the same server can support multiple secrets. This is how the YARN shuffle service works. The In my view, to implement this, you can do it in two different ways:
In neither cases there should be the need to make any modifications to the transport library. Is there any reason why that would not work? |
I have one question about the target of this PR: are we targeting to build a specific client to AM endpoint which only exists in yarn cluster mode, or we're targeting a general client to driver communication? For example if we're going to support pushing tokens from external client, then I think it should be orthogonal to cluster managers, also like querying application state. |
this part is specific to yarn because its all different on how the clients for various cluster managers talk. I agree the interface for pushing tokens should be generic but its going to be cluster manager specific implementation on how that happens from a client to the spark application. In this case this client to am rpc can be used for the pushing. once its at the driver it can be generic to reload and push to the executors. I also think things like status and kill are going to be different depending on your cluster manager. Yes there is the generic spark driver type kill but unregistering for yarn for instance will be cluster specific. The yarn status is different from the status of the driver. We could use the rest server here like mesos and standalone, the problem is that doesn't satisfy all the use cases and setups. For instance if someone doesn't have a good UI authentication like spnego its hard to use headless users. so the rpc for yarn is a better fit. |
Regardless of whether it's possible to re-use code for other cluster managers, I think Saisai's initial question makes sense: should this be a client-to-AM channel or a client-to-driver channel? I see a client-to-driver connection as more useful; it also avoids opening a listen socket on the AM. Anything that the driver needs the AM's help with, the YARN-specific implementation can send a message to the currently registered AM. At the end, you can achieve any desired goal with either approach, but making it client-to-driver from the start might make it simpler to handle both client vs. cluster mode (I think), and also might make it simpler to have similar functionality for other cluster managers later. |
I don't see to much difference whether it goes to driver or AM, either way AM and driver have to talk after the initial connection. For something like a kill command its actually probably better to go to AM first because in client mode that way you know on the yarn side for certain you got a kill command and you don't need to reattempt. But you should also be able to go to driver and have driver tell the AM it got the kill. As long as it doesn't shutdown or for some reason can't get the message out to AM. I don't have to strong of a opinion which way we go other then I would have to look to see how it would be conditionalized right now for yarn only. If we think it will be useful for other cluster managers then we can look at switching to do driver first. I was kind of figuring those other ones would use the rest server to do that. |
The rest server does not talk to applications, it is sort of the cluster manager master (what would be the YARN RM), so that's a different thing. I guess you could implement this as "send request to rest server and and it then forwards the request to the driver", but I'm not familiar with how Spark apps talk to the rest server if at all. |
Also, regardless of where the client will connect to, I think the more important question here is why are there any changes needed to the network library. I don't see how the current code would not work if you have a proper implementation of |
I'm inclined to build a general client-driver channel instead of a specific client-AM channel. I'm not sure how many differences between two, but I did the former before (without SASL), seems not so many codes. My concern is that lately when we need to support a general client-driver channel, then we need another refactoring to support this, why don't we do it at beginning. This happened when I build a credential manager in YARN, but later on mesos guy moved it to core, this moving bring in additional effort, also left a regression in the code. |
@vanzin still need to look at the network library thing, just haven't had a chance. And you are right, I missed that it wasn't using the rest to talk to the apps as well. The mesos one is using the mesos scheduler kill command. Yeah I'm definitely fine changing to the client/driver channel. |
Any updates here? Or should we close this PR until this is properly investigated? |
we can close for now, got busy with some other stuff hope to get back to this soon |
What changes were proposed in this pull request?
In this JIRA, I develop code to create a communication channel between arbitrary clients and a Spark AM on YARN. This code can be utilized to send commands such as getting status command, getting history info from the CLI, killing the application and pushing new tokens.
Design Doc:
https://docs.google.com/document/d/1QMbWhg13ocIoADywZQBRRVj-b9Zf8CnBrruP5JhcOOY/edit?usp=sharing
How was this patch tested?
Tested by sending a Kill RPC message to an AM.
Worked for both sasl/auth layer
Code used to test the communication channel.
https://github.com/yoonlee95/spark/tree/authkill