-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order #43627
[SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order #43627
Conversation
I'll have to figure out the CI. It seems my fork is running things, but I am getting some failures in this page ( |
The MIMA tests are failing due to:
Which makes sense, since I changed I am not entirely sure if adding this to |
Ok I believe given the mima code that I need to add a temporary skip here: https://github.com/apache/spark/blob/master/project/MimaExcludes.scala#L37. |
@tgravescs fyi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks fine, it does complicate the initialization a bit but not sure I see a better way to handle that. Would be good to get more eyes on it.
cc @mridulm since I think you looked at shuffle related stuff in past.
@@ -71,6 +69,9 @@ class SparkEnv ( | |||
val outputCommitCoordinator: OutputCommitCoordinator, | |||
val conf: SparkConf) extends Logging { | |||
|
|||
// We set the ShuffleManager in SparkContext and Executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit update comment to say something like: the ShuffleManager is initialized later in... to allow it being defined in user specified jars.
new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager, | ||
isDriver = true)), | ||
new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, | ||
sc.env.shuffleManager.shuffleBlockResolver.getBlocksForShuffle, true)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put back the isDriver = true as last parameter
val blockTransferService: BlockTransferService, | ||
securityManager: SecurityManager, | ||
externalBlockStoreClient: Option[ExternalBlockStoreClient]) | ||
extends BlockDataManager with BlockEvictionHandler with Logging { | ||
|
||
// this is set after the ShuffleManager is instantiated in SparkContext and Executor | ||
private var shuffleManager: ShuffleManager = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update description above to mention having to set the shuffle manager as well.
… in BM replication suite
@tgravescs thanks for the review. I have handled your comments in this commit: 0bd7e99 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given there is a way for users to specify this reasonably right now, the amount of change to add support for this looks a bit lot.
Thoughts @tgravescs ?
@@ -71,6 +69,10 @@ class SparkEnv ( | |||
val outputCommitCoordinator: OutputCommitCoordinator, | |||
val conf: SparkConf) extends Logging { | |||
|
|||
// We initialize the ShuffleManager later, in SparkContext and Executor, to allow | |||
// user jars to define custom ShuffleManagers. | |||
var shuffleManager: ShuffleManager = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given SparkEnv
is a DeveloperApi
, let us not expose this for mutation.
var shuffleManager: ShuffleManager = _ | |
private var _shuffleManager: ShuffleManager = _ | |
def shuffleManager: ShuffleManager = _shuffleManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix !!
@mridulm thanks for the comments. I have published a SPIP here https://issues.apache.org/jira/browse/SPARK-45792 that aims to show the bigger picture. Without the change of initialization order in this PR, we couldn't carry out the SPIP linked, because the ShuffleManager is initialized really early in the Executors today. I split this up into a separate PR to not introduce too much change at once, but your point is well taken. I would like to hear your thoughts around the SPIP and how we can proceed. Note there is an alternative I can easily try and that is to instantiate a ShuffleManager wrapper, which would remove the change to the SparkEnv (we would instantiate the wrapper instead of the actual impl). We could then set the impl on this wrapper at a later time, when jars are localized and plugins are loaded. This felt a bit worse than the approach I have in this PR, but I am happy to hear opinions. Thanks again!! |
/** | ||
* Utility companion object to create a ShuffleManager given a spark configuration. | ||
*/ | ||
private[spark] object ShuffleManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we put the companion object at the last?
@@ -402,7 +405,7 @@ object SparkEnv extends Logging { | |||
None | |||
}, blockManagerInfo, | |||
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], | |||
shuffleManager, | |||
shuffleBlockGetterFn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not define shuffleBlockGetterFn
in BlockManagerMasterEndpoint
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this being an issue in tests where the SparkEnv
would not be set, so now I'd have to make sure that the env is set and cleared in the tests. That said, if you feel strongly about this, I can look at this more.
While I am not opposed to a way to create a short name for shuffle manager, if it results in nontrivial changes to Spark, I am not very inclined towards it. |
@beliefer thanks for the comments, I handled most of your comments in the last commit (except for the one about the function passing, but we can discuss that one more there). |
I agree that ideally we would finish SPARK-25299, I don't see that happening anytime soon. I also don't think it covers the case of people replacing the entire ShuffleManager vs just the storage piece. ShuffleManager API isn't public either but we have multiple implementations doing that now (Ubers RSS, project Gluten, Spark Rapids, I thought Cosco was although its not open source, etc). Overall while there are a bunch of changes here most of it is just moving initialization stuff around that shouldn't impact anything else. The one thing that is user impacting is the SparkEnv api change, which if we only do with 4.0 shouldn't be a big deal, unless there is some usage I'm not aware of. @mridulm Is there a specific part you are concerned with? |
@tgravescs @mridulm @beliefer I made a small tweak where the |
@tgravescs The SparkEnv related changes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am yet to test my comments, but you can reference a version with the changes here
(SparkSubmitSuite passes).
@@ -627,6 +631,7 @@ class SparkContext(config: SparkConf) extends Logging { | |||
} | |||
_ui.foreach(_.setAppId(_applicationId)) | |||
_env.blockManager.initialize(_applicationId) | |||
_env.blockManager.setShuffleManager(shuffleManager) | |||
FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only need _env.initiailzeShuffleManager()
(to replace env.setShuffleManager
) in this class - we can revert the rest.
private[spark] def setShuffleManager(shuffleManager: ShuffleManager): Unit = { | ||
_shuffleManager = shuffleManager | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of setting it, expose an initialize method.
We use initializeShuffleManager
in driver/executor after classpath has been fixed up (see more below in comment for shuffleBlockGetterFn
).
private[spark] def setShuffleManager(shuffleManager: ShuffleManager): Unit = { | |
_shuffleManager = shuffleManager | |
} | |
private[spark] def initiailzeShuffleManager(): Unit = { | |
Preconditions.checkState(null == _shuffleManager, | |
"Shuffle manager already initialized to %s", _shuffleManager) | |
_shuffleManager = ShuffleManager.create(conf, executorId == SparkContext.DRIVER_IDENTIFIER) | |
} |
val env = SparkEnv.get | ||
env.shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapId) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop this ?
In BlockManagerMasterEndpoint
:
- We change constructor to:
private val _shuffleManager: ShuffleManager,
- And add a field:
private lazy val shuffleManager = Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
Do the same for BlockManager
as well.
See more below in create
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this later, preserving this is mainly to minimize test code changes, and allow for a way to override it.
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) | ||
val shuffleManager = Utils.instantiateSerializerOrShuffleManager[ShuffleManager]( | ||
shuffleMgrClass, conf, isDriver) | ||
|
||
val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, do:
val shuffleManager: ShuffleManager = if (isDriver) ShuffleManager.create(conf, true) else null
and keep rest of this method the same.
Simply pass shuffleManager = null
private val shuffleManager = | ||
Utils.withContextClassLoader(defaultSessionState.replClassLoader) { | ||
ShuffleManager.create(conf, true) | ||
} | ||
|
||
env.setShuffleManager(shuffleManager) | ||
env.blockManager.setShuffleManager(shuffleManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private val shuffleManager = | |
Utils.withContextClassLoader(defaultSessionState.replClassLoader) { | |
ShuffleManager.create(conf, true) | |
} | |
env.setShuffleManager(shuffleManager) | |
env.blockManager.setShuffleManager(shuffleManager) | |
if (! isLocal) { | |
Utils.withContextClassLoader(defaultSessionState.replClassLoader) { | |
env.initiailzeShuffleManager() | |
} | |
} |
I have not tested this, but I think this should work. If it does not, most of my suggestions will need to be discarded :-)
@@ -81,9 +81,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite | |||
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) | |||
val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the proposed changes, we can revert all changes to this file
@@ -143,10 +143,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe | |||
None | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as with BlockManagerReplicationSuite
, all changes can be reverted here as well.
@@ -93,7 +93,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) | |||
val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() | |||
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well, revert all changes.
@abellina, given SPARK-45792 is an SPIP, can you please surface in spark-dev@ and initiate a discussion on it ? I dont remember seeing it there. |
Thanks @mridulm, yes the commits make sense, it brings back the late initialization in the driver. I tested the change, the main difference from your patch @mridulm is I had to still get the shuffle manage class names using the method we added to the @tgravescs fyi |
@@ -71,6 +70,12 @@ class SparkEnv ( | |||
val outputCommitCoordinator: OutputCommitCoordinator, | |||
val conf: SparkConf) extends Logging { | |||
|
|||
// We initialize the ShuffleManager later in SparkContext, and Executor, to allow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// We initialize the ShuffleManager later in SparkContext, and Executor, to allow | |
// We initialize the ShuffleManager later in SparkContext and Executor to allow |
// SPARK-45762 introduces a change where the ShuffleManager is initialized later | ||
// in the SparkContext and Executor, to allow for custom ShuffleManagers defined | ||
// in user jars. In the executor, the BlockManager uses a lazy val to obtain the | ||
// shuffleManager from the SparkEnv. In the driver, the SparkEnv's shuffleManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment it no longer true. Driver SparkEnv shufflemanager is created after the plugin initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tgravescs. Handled both comments here: 6d002a3
There were some CI failures around missing dependencies in the documentation build (all tests are passing otherwise). So I have upmerged. I also tweaked a couple of comments here: 5480faa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me.
Merged to master. |
Thanks @mridulm @tgravescs and @beliefer for the reviews and rework! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @abellina , @mridulm , @tgravescs, @beliefer .
Although this is a developer API, this is a documented one. Do you think we can avoid this breaking change by adding a new constructor instead?
@dongjoon-hyun, it is In general, I am conflicted about trying to preserve compatibility for things which are clearly private to spark - it inhibits the ability for the project to evolve: especially around major version boundaries (though we do have few of these instances where we try to maintain compatibility). Given how long |
Thanks for understanding @dongjoon-hyun ! |
…plugin is loaded ### What changes were proposed in this pull request? This changes the initialization of `SparkEnv.memoryManager` to after the `DriverPlugin` is loaded, to allow the plugin to customize memory related configurations. A minor fix has been made to `Task` to make sure that it uses the same `BlockManager` through out the task execution. Previous a different `BlockManager` could be used in some corner cases. Also added a test for the fix. ### Why are the changes needed? Today, there is no way for a custom `DriverPlugin` to override memory configurations such as `spark.executor.memory`, `spark.executor.memoryOverhead`, `spark.memory.offheap.size` etc This is because the memory manager is initialized before `DriverPlugin` is loaded. A similar change has been made to `shuffleManager` in #43627. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Also added new tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45052 from sunchao/SPARK-46947. Authored-by: Chao Sun <sunchao@apache.org> Signed-off-by: Chao Sun <sunchao@apache.org>
…plugin is loaded ### What changes were proposed in this pull request? This changes the initialization of `SparkEnv.memoryManager` to after the `DriverPlugin` is loaded, to allow the plugin to customize memory related configurations. A minor fix has been made to `Task` to make sure that it uses the same `BlockManager` through out the task execution. Previous a different `BlockManager` could be used in some corner cases. Also added a test for the fix. ### Why are the changes needed? Today, there is no way for a custom `DriverPlugin` to override memory configurations such as `spark.executor.memory`, `spark.executor.memoryOverhead`, `spark.memory.offheap.size` etc This is because the memory manager is initialized before `DriverPlugin` is loaded. A similar change has been made to `shuffleManager` in apache#43627. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Also added new tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45052 from sunchao/SPARK-46947. Authored-by: Chao Sun <sunchao@apache.org> Signed-off-by: Chao Sun <sunchao@apache.org>
…plugin is loaded ### What changes were proposed in this pull request? This changes the initialization of `SparkEnv.memoryManager` to after the `DriverPlugin` is loaded, to allow the plugin to customize memory related configurations. A minor fix has been made to `Task` to make sure that it uses the same `BlockManager` through out the task execution. Previous a different `BlockManager` could be used in some corner cases. Also added a test for the fix. ### Why are the changes needed? Today, there is no way for a custom `DriverPlugin` to override memory configurations such as `spark.executor.memory`, `spark.executor.memoryOverhead`, `spark.memory.offheap.size` etc This is because the memory manager is initialized before `DriverPlugin` is loaded. A similar change has been made to `shuffleManager` in apache#43627. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Also added new tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45052 from sunchao/SPARK-46947. Authored-by: Chao Sun <sunchao@apache.org> Signed-off-by: Chao Sun <sunchao@apache.org>
…tor.userClassPathFirst=true with ShuffleManager defined in user jar ### What changes were proposed in this pull request? `SparkShuffleManager` print warning log for `spark.executor.userClassPathFirst=true` with `ShuffleManager` defined in user jar via `--jar` or `spark.jars`. ### Why are the changes needed? When `spark.executor.userClassPathFirst` is enabled with ShuffleManager defined in user jar, the `ClassLoader` of `handle` is `ChildFirstURLClassLoader`, which is different from `CelebornShuffleHandle` of which the `ClassLoader` is `AppClassLoader` in `SparkShuffleManager#getWriter/getReader`. The local test log is as follows: ``` ./bin/spark-sql --master yarn --deploy-mode client \ --conf spark.celeborn.master.endpoints=localhost:9099 \ --conf spark.executor.userClassPathFirst=true \ --conf spark.jars=/tmp/celeborn-client-spark-3-shaded_2.12-0.5.0-SNAPSHOT.jar \ --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \ --conf spark.shuffle.service.enabled=false ./bin/spark-sql --master yarn --deploy-mode client --jars /tmp/celeborn-client-spark-3-shaded_2.12-0.5.0-SNAPSHOT.jar \ --conf spark.celeborn.master.endpoints=localhost:9099 \ --conf spark.executor.userClassPathFirst=true \ --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \ --conf spark.shuffle.service.enabled=false ``` ``` 24/04/28 18:03:31 [Executor task launch worker for task 0.0 in stage 5.0 (TID 8)] WARN SparkShuffleManager: [getWriter] handle classloader: org.apache.spark.util.ChildFirstURLClassLoader, CelebornShuffleHandle classloader: sun.misc.Launcher$AppClassLoader ``` It causes that `SparkShuffleManager` fallback to vanilla Spark `SortShuffleManager` for `spark.executor.userClassPathFirst=true` with `ShuffleManager` defined in user jar before apache/spark#43627. After [SPARK-45762](https://issues.apache.org/jira/browse/SPARK-45762), the `ClassLoader` of `handle` and `CelebornShuffleHandle` are both `ChildFirstURLClassLoader`. ``` 24/04/28 18:03:31 [Executor task launch worker for task 0.0 in stage 5.0 (TID 8)] WARN SparkShuffleManager: [getWriter] handle classloader: org.apache.spark.util.ChildFirstURLClassLoader, CelebornShuffleHandle classloader: org.apache.spark.util.ChildFirstURLClassLoader ``` Therefore, `SparkShuffleManager` should print warning log to remind for `spark.executor.userClassPathFirst=true` with `ShuffleManager` defined in user jar. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test. Closes #2482 from SteNicholas/CELEBORN-1402. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: SteNicholas <programgeek@163.com>
What changes were proposed in this pull request?
As reported here https://issues.apache.org/jira/browse/SPARK-45762,
ShuffleManager
instances defined in a user jar cannot be used in all cases, unless specified in theextraClassPath
. We would like to avoid adding extra configurations if this instance is already included in a jar passed via--jars
.Proposed changes:
Refactor code so we initialize the
ShuffleManager
later, after jars have been localized. This is especially necessary in the executor, where we would need to move this initialization until after thereplClassLoader
is updated with jars passed in--jars
.Before this change, the
ShuffleManager
is instantiated atSparkEnv
creation. Having to instantiate theShuffleManager
this early doesn't work, because user jars have not been localized in all scenarios, and we will fail to load theShuffleManager
defined in--jars
. We propose moving theShuffleManager
instantiation toSparkContext
on the driver, andExecutor
.Why are the changes needed?
This is not a new API but a change of startup order. The changed are needed to improve the user experience for the user by reducing extra configurations depending on how a spark application is launched.
Does this PR introduce any user-facing change?
Yes, but it's backwards compatible. Users no longer need to specify a
ShuffleManager
jar inextraClassPath
, but they are able to if they desire.This change is not binary compatible with Spark 3.5.0 (see MIMA comments below). I have added a rule to MimaExcludes to handle it 970bff4
How was this patch tested?
Added a unit test showing that a test
ShuffleManager
is available after--jars
are passed, but not without (using local-cluster mode).Tested manually with standalone mode, local-cluster mode, yarn client and cluster mode, k8s.
Was this patch authored or co-authored using generative AI tooling?
No