Skip to content

Commit

Permalink
Add detailed java docs + reword a few comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Nov 5, 2014
1 parent d1124e4 commit 7b71d8f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ public class ShuffleSecretManager implements SecretKeyHolder {
private static final String SPARK_SASL_USER = "sparkSaslUser";

/**
* Convert the given string to a byte buffer that can be converted back to a string
* through {@link #bytesToString(ByteBuffer)}. This is used if the external shuffle
* service represents shuffle secrets as bytes buffers instead of strings.
* Convert the given string to a byte buffer. The resulting buffer can be converted back to
* the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external
* shuffle service represents shuffle secrets as bytes buffers instead of strings.
*/
public static ByteBuffer stringToBytes(String s) {
return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET));
}

/**
* Convert the given byte buffer to a string that can be converted back to a byte
* buffer through {@link #stringToBytes(String)}. This is used if the external shuffle
* service represents shuffle secrets as bytes buffers instead of strings.
* Convert the given byte buffer to a string. The resulting string can be converted back to
* the same byte buffer through {@link #stringToBytes(String)}. This is used if the external
* shuffle service represents shuffle secrets as bytes buffers instead of strings.
*/
public static String bytesToString(ByteBuffer b) {
return new String(b.array(), UTF8_CHARSET);
Expand All @@ -63,9 +63,9 @@ public ShuffleSecretManager() {
}

/**
* Register the specified application with its secret.
* Register an application with its secret.
* Executors need to first authenticate themselves with the same secret before
* the fetching shuffle files written by other executors in this application.
* fetching shuffle files written by other executors in this application.
*/
public void registerApp(String appId, String shuffleSecret) {
if (!shuffleSecretMap.contains(appId)) {
Expand All @@ -77,15 +77,15 @@ public void registerApp(String appId, String shuffleSecret) {
}

/**
* Register the specified application with its secret specified as a byte buffer.
* Register an application with its secret specified as a byte buffer.
*/
public void registerApp(String appId, ByteBuffer shuffleSecret) {
registerApp(appId, bytesToString(shuffleSecret));
}

/**
* Unregister the specified application along with its secret.
* This is called when an application terminates.
* Unregister an application along with its secret.
* This is called when the application terminates.
*/
public void unregisterApp(String appId) {
if (shuffleSecretMap.contains(appId)) {
Expand All @@ -105,10 +105,10 @@ public String getSaslUser(String appId) {
}

/**
* Return the secret key registered with the specified application.
* This key is used to authenticate the executors in the application
* before they can fetch shuffle files from the external shuffle service.
* If the application is not registered, return null.
* Return the secret key registered with the given application.
* This key is used to authenticate the executors before they can fetch shuffle files
* written by this application from the external shuffle service. If the specified
* application is not registered, return null.
*/
@Override
public String getSecretKey(String appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,22 @@
import org.apache.spark.network.yarn.util.HadoopConfigProvider;

/**
* External shuffle service used by Spark on Yarn.
* An external shuffle service used by Spark on Yarn.
*
* This is intended to be a long-running auxiliary service that runs in the NodeManager process.
* A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
* The application also automatically derives the service port through `spark.shuffle.service.port`
* specified in the Yarn configuration. This is so that both the clients and the server agree on
* the same port to communicate on.
*
* The service also optionally supports authentication. This ensures that executors from one
* application cannot read the shuffle files written by those from another. This feature can be
* enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
* Note that the Spark application must also set `spark.authenticate` manually and, unlike in
* the case of the service port, will not inherit this setting from the Yarn configuration. This
* is because an application running on the same Yarn cluster may choose to not use the external
* shuffle service, in which case its setting of `spark.authenticate` should be independent of
* the service's.
*/
public class YarnShuffleService extends AuxiliaryService {
private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
Expand All @@ -58,12 +73,12 @@ public class YarnShuffleService extends AuxiliaryService {
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;

// Actual server that serves the shuffle files
// The actual server that serves shuffle files
private TransportServer shuffleServer = null;

public YarnShuffleService() {
super("spark_shuffle");
logger.info("Initializing Yarn shuffle service for Spark");
logger.info("Initializing YARN shuffle service for Spark");
}

/**
Expand Down Expand Up @@ -96,10 +111,10 @@ protected void serviceInit(Configuration conf) {
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
shuffleServer = transportContext.createServer(port);
String authEnabledString = authEnabled ? "enabled" : "not enabled";
logger.info("Started Yarn shuffle service for Spark on port {}. " +
logger.info("Started YARN shuffle service for Spark on port {}. " +
"Authentication is {}.", port, authEnabledString);
} catch (Exception e) {
logger.error("Exception in starting Yarn shuffle service for Spark", e);
logger.error("Exception in starting YARN shuffle service for Spark", e);
}
}

Expand Down

0 comments on commit 7b71d8f

Please sign in to comment.