-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21737][YARN]Create communication channel between arbitrary clients and the Spark AM in YARN mode #18978
Closed
Closed
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
14c680c
patch applied with merge conflict fixed
af9d7e3
tested cluster & client mode
26a92dc
scala style fixed
f538483
removed out file
19478d1
init striping
f829584
added auth rpc layer support
ef5aec7
auth rpc debugged
8c46a04
Merge branch 'master' of https://github.com/apache/spark into clientT…
96b06c3
made hadoopSeecurityUils Class
7e6b076
style fixed
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,11 +39,17 @@ | |
import org.apache.commons.crypto.cipher.CryptoCipherFactory; | ||
import org.apache.commons.crypto.random.CryptoRandom; | ||
import org.apache.commons.crypto.random.CryptoRandomFactory; | ||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; | ||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import org.apache.spark.network.util.TransportConf; | ||
|
||
import static org.apache.spark.network.util.HadoopSecurityUtils.decodeMasterKey; | ||
import static org.apache.spark.network.util.HadoopSecurityUtils.getClientToAMSecretKey; | ||
import static org.apache.spark.network.util.HadoopSecurityUtils.getIdentifier; | ||
|
||
/** | ||
* A helper class for abstracting authentication and key negotiation details. This is used by | ||
* both client and server sides, since the operations are basically the same. | ||
|
@@ -54,11 +60,13 @@ class AuthEngine implements Closeable { | |
private static final BigInteger ONE = new BigInteger(new byte[] { 0x1 }); | ||
|
||
private final byte[] appId; | ||
private final char[] secret; | ||
private final byte[] user; | ||
private char[] secret; | ||
private final TransportConf conf; | ||
private final Properties cryptoConf; | ||
private final CryptoRandom random; | ||
|
||
private String clientUser; | ||
private byte[] authNonce; | ||
|
||
@VisibleForTesting | ||
|
@@ -69,13 +77,25 @@ class AuthEngine implements Closeable { | |
private CryptoCipher decryptor; | ||
|
||
AuthEngine(String appId, String secret, TransportConf conf) throws GeneralSecurityException { | ||
this(appId, "",secret, conf); | ||
} | ||
|
||
AuthEngine(String appId, String user, String secret, TransportConf conf) throws GeneralSecurityException { | ||
this.appId = appId.getBytes(UTF_8); | ||
this.user = user.getBytes(UTF_8); | ||
this.conf = conf; | ||
this.cryptoConf = conf.cryptoConf(); | ||
this.secret = secret.toCharArray(); | ||
this.random = CryptoRandomFactory.getCryptoRandom(cryptoConf); | ||
} | ||
|
||
/** | ||
* Returns the user name of the client. | ||
*/ | ||
public String getClientUserName() { | ||
return clientUser; | ||
} | ||
|
||
/** | ||
* Create the client challenge. | ||
* | ||
|
@@ -89,6 +109,7 @@ ClientChallenge challenge() throws GeneralSecurityException, IOException { | |
|
||
this.challenge = randomBytes(conf.encryptionKeyLength() / Byte.SIZE); | ||
return new ClientChallenge(new String(appId, UTF_8), | ||
new String(user, UTF_8), | ||
conf.keyFactoryAlgorithm(), | ||
conf.keyFactoryIterations(), | ||
conf.cipherTransformation(), | ||
|
@@ -106,9 +127,22 @@ ClientChallenge challenge() throws GeneralSecurityException, IOException { | |
*/ | ||
ServerResponse respond(ClientChallenge clientChallenge) | ||
throws GeneralSecurityException, IOException { | ||
SecretKeySpec authKey; | ||
if (conf.isConnectionUsingTokens()) { | ||
// Create a secret from client's token identifier and AM's master key. | ||
ClientToAMTokenSecretManager secretManager = new ClientToAMTokenSecretManager(null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see comment in SparkSaslServer about making this common |
||
decodeMasterKey(new String(secret))); | ||
ClientToAMTokenIdentifier identifier = getIdentifier(clientChallenge.user); | ||
secret = getClientToAMSecretKey(identifier, secretManager); | ||
|
||
clientUser = identifier.getUser().getShortUserName(); | ||
} else { | ||
clientUser = clientChallenge.user; | ||
} | ||
|
||
authKey = generateKey(clientChallenge.kdf, clientChallenge.iterations, clientChallenge.nonce, | ||
clientChallenge.keyLength); | ||
|
||
SecretKeySpec authKey = generateKey(clientChallenge.kdf, clientChallenge.iterations, | ||
clientChallenge.nonce, clientChallenge.keyLength); | ||
initializeForAuth(clientChallenge.cipher, clientChallenge.nonce, authKey); | ||
|
||
byte[] challenge = validateChallenge(clientChallenge.nonce, clientChallenge.challenge); | ||
|
@@ -119,6 +153,7 @@ ServerResponse respond(ClientChallenge clientChallenge) | |
|
||
SecretKeySpec sessionKey = generateKey(clientChallenge.kdf, clientChallenge.iterations, | ||
sessionNonce, clientChallenge.keyLength); | ||
|
||
this.sessionCipher = new TransportCipher(cryptoConf, clientChallenge.cipher, sessionKey, | ||
inputIv, outputIv); | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't a public class, its package level visibility so I don't think we need to add another constructor here.