-
Notifications
You must be signed in to change notification settings - Fork 118
Conversation
4f6f75a
to
988db3b
Compare
shufflePodCache.get(executorNode) match { | ||
case Some(pod) => pod | ||
case _ => | ||
throw new SparkException(s"Unable to find shuffle pod on node $executorNode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A corner case comes into my mind: if a shuffle pod for a node died and is being restarted, and at this moment a new executor on that node registers with the driver, it would crash the driver.
Can we improve this, e.g. let the executor die when the shuffle pod is not ready, instead of throwing SparkException to abort the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! I think you're right. Done. Passing back an empty string which should make the executor crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing the exception here should be fine, right? This is being processed on a separate thread in the RPC environment. Thus the exception here should only propagate to the executor that asked for this configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be wrong, but IIUC SparkException is thrown whenever unrecoverable errors happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it's less important what the type of exception is. What's important is where the exception is thrown from and where it propagates to.
addShufflePodToCache(p) | ||
} | ||
} | ||
override def onClose(e: KubernetesClientException): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a daemonset
watchable? If so can we watch on it directly instead of using labels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, but we don't want to make an assumption that it's a daemonset in use. In the current way, it remains - any pod that is co-located on that node with the same labels.
"shuffle-labels") | ||
val shuffleDirs = conf.getOption(KUBERNETES_SHUFFLE_DIR.key).map { | ||
_.split(",") | ||
}.getOrElse(Utils.getConfiguredLocalDirs(conf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw an exception here is dynamic allocation is enabled but shuffle.labels is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
import org.apache.spark.internal.Logging | ||
import org.apache.spark.util.ThreadUtils | ||
|
||
private[spark] class NodeCacheManager ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the name NodeCacheManager is not that intuitive. Maybe sth. like ShufflePodsCatalog? I don't know..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to ShufflePodCache
fc70821
to
bccf43b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the top level PR, will follow up on changes in the subsequent individual commits.
|
||
private val allocatorRunnable: Runnable = new Runnable { | ||
override def run(): Unit = { | ||
if (runningExecutorPods.size - totalRegisteredExecutors.get() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could probably just use <
for clarity.
override def run(): Unit = { | ||
if (runningExecutorPods.size - totalRegisteredExecutors.get() > 0) { | ||
logDebug("Waiting for pending executors before scaling") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to structure the logic so that we don't use return
.
if (totalExpectedExecutors.get() <= runningExecutorPods.size) { | ||
logDebug( | ||
"Maximum allowed executor limit reached. Not scaling up further.") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here - avoid return
.
* KubernetesAllocator class watches executor registrations, limits | ||
* and creates new executors when it is appropriate. | ||
*/ | ||
private[spark] class KubernetesAllocator(client: KubernetesClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this specifically need to be a separate class? The code could just be inlined in the scheduler backend class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the separate class for the separation it offers for this allocator mechanism. Do you strongly prefer not having it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ought to be in a separate file then, I think. But if this also requires a backwards reference to the scheduler backend (e.g. to access fields) then this should just be inlined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It requires access to totalRegisteredExecutors
which is a protected field in CoarseGrainedSchedulerBackend
and a couple of other accounting fields from the KubernetesClusterSchedulerBackend
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, did you mean that it shouldn't be a separate class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
def start(): Unit = { | ||
// seed the initial cache. | ||
val pods = client.pods().withLabels(dsLabels.asJava).list() | ||
for (pod <- pods.getItems.asScala) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we prefer foreach
over for
in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -81,6 +82,68 @@ private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) | |||
}) | |||
} | |||
|
|||
private def expectationsForDynamicAllocation(sparkMetricsService: SparkRestApiV1): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid adding tests to V1 and solely focus on V2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.endContainer() | ||
.endSpec() | ||
|
||
var resolvedPodBuilder = shuffleServiceConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val
not var
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
import org.apache.spark.sql.SparkSession | ||
|
||
object GroupByTest { | ||
def main(args: Array[String]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to confirm that this test is creating multiple executors and is writing files to the shuffle service. I'm not sure if we can do this in an automated way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can achieve that by setting --conf spark.dynamicAllocation.minExecutors=2
and wait for these two executors to be ready via k8s api (or spark rest api?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The executors can spin up but not write any shuffle data to disk. We should check that shuffle data is being written to the disks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose this test so that it would have shuffle data being written to disk. I've manually verified that it does write to disk.
} | ||
|
||
def start(): Unit = { | ||
if (interval > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make the interval
an Optional configuration and use interval.foreach
, as opposed to checking if an integer is greater than zero. We should validate then that any given value is positive and throw an exception if it isn't.
s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") | ||
} | ||
|
||
val shuffleDirs = conf.getOption(KUBERNETES_SHUFFLE_DIR.key).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use .get
instead of .getOption
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to get Option[String]
because if the shuffle directory is left empty, we use the default from Utils.getConfiguredLocalDirs(conf)
. I'm not sure how we can get this behavior using get
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using .get
here without using .key
on the configuration key should give back an Option
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah.. good point. Done
3ac475f
to
5264fad
Compare
@mccheah Addressed all comments. PTAL |
5264fad
to
4400a8c
Compare
Has the unit testing changed? I'm seeing failures in files I did not touch at all. |
rerun unit tests please |
@foxish I've been working on an SBT-based unit test build in jenkins and it looks like it was racing with the current maven-based unit tests. I've disabled the new test build and expect just the old one to be running now. Sorry about that! |
@ash211, we can fix the new one. The errors appeared to be:
Sending a PR to fix these. |
Ah! Okay, SG. Thanks! |
2f05ac0
to
a861849
Compare
a861849
to
c87008d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to add unit-level tests around these? It would be great if we can start hardening the features we are implementing here. Unit testing these kinds of things can be difficult; we would probably have to refactor much of the scheduler backend and the shuffle pod cache to make us able to verify the things that are important.
Some(conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)) | ||
} else { | ||
throw new SparkException(s"Allocation batch size ${KUBERNETES_ALLOCATION_BATCH_SIZE} " + | ||
s"should be a positive integer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add what the value the user specified was.
@@ -130,12 +197,27 @@ private[spark] class KubernetesClusterSchedulerBackend( | |||
super.start() | |||
executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) | |||
.watch(new ExecutorPodsWatcher())) | |||
|
|||
podAllocationInterval.foreach(allocator.scheduleWithFixedDelay(allocatorRunnable, | |||
0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put allocatorRunnable, 0, TimeUnit.SECONDS
all on this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think since now podAllocationInterval
is always going to be provided (we always set it to Some(...)
or throw an exception) then this thread will always be running. Is this the intended behavior? If so, no need to use foreach and options here.
c87008d
to
2b5bba0
Compare
private val allocatorRunnable: Runnable = new Runnable { | ||
override def run(): Unit = { | ||
if (totalRegisteredExecutors.get() >= runningExecutorPods.size) { | ||
if (totalExpectedExecutors.get() > runningExecutorPods.size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be cleaner I think to use if...else if... else
here:
if (...) {
logDebug("Maximum allowed executor limit...")
} else if (...) {
logDebug("Waiting for pending...")
} else {
// Actual logic
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks! Trying to add a couple of unit tests to ShufflePodCache now and a mechanism that might help us add tests easily in the future.
2b5bba0
to
26805ed
Compare
@@ -105,6 +131,44 @@ private[spark] class KubernetesClusterSchedulerBackend( | |||
|
|||
private val initialExecutors = getInitialTargetExecutorNumber(1) | |||
|
|||
private val podAllocationInterval = | |||
if (conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably no need to make this an option - just assign podAllocationInterval
directly, and then check the variable directly + throw the SparkException immediately afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
import org.apache.spark.SparkException | ||
|
||
object CommandLineUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be KeyValueUtils
? This doesn't seem related to the command line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I envisioned it as a place for utility functions related to commandline options which we could have more of, in future. The parsing of key-values is necessitated by commandline strings being supplied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to mainly be used to parse out labels and annotations from SparkConf values - the command line doesn't seem to be related to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I was assuming the primary way of supplying those args was via the cmdline. Okay, how about ConfigurationUtils
? KeyValueUtils.parseKeyValuePairs()
just seems a bit redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfigurationUtils
is fine.
26805ed
to
b377fa6
Compare
Unit tests seem more complex than expected, because watchers and such. https://mvnrepository.com/artifact/io.fabric8/kubernetes-server-mock provided an easy beginnning but I think I'll take it separately instead of blocking experiments using dynamic allocation. |
We can probably test the watches separately and just ensure that if the watch receives an event then the scheduler responds accordingly. |
The mock server can be taught to expect the watch calls and respond appropriately. I used a similar thing in the unit tests here. |
Created #275, will follow up there |
b377fa6
to
6ec3d59
Compare
Updated docs, any other comments? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a few minor style things but they can be addressed either here or at some other point. Someone else can make a final pass before merging, but if there are no objections before the end of the day then feel free to proceed with the merge.
@@ -105,6 +131,40 @@ private[spark] class KubernetesClusterSchedulerBackend( | |||
|
|||
private val initialExecutors = getInitialTargetExecutorNumber(1) | |||
|
|||
private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) | |||
if (podAllocationInterval <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use require
here and in other similar places.
val runningExecutorPod = kubernetesClient | ||
.pods() | ||
.withName( | ||
runningExecutorPods(executorId).getMetadata.getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move up to previous line.
import org.apache.spark.internal.Logging | ||
|
||
private[spark] class ShufflePodCache ( | ||
val client: KubernetesClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't have to be val
s and it's preferred that they aren't since these variables are now accessible from outside the scope of this class.
6ec3d59
to
4dd4715
Compare
Addressed comments. Will merge after tests pass. |
I think this dynamic allocation PR first, then afterwards the init containers one. That way the executor recovery PR can start making progress given that it's also blocked on this PR merging |
Okay, SG. Merging this now, as tests passed. |
* dynamic allocation: shuffle service docker, yaml and test fixture * dynamic allocation: changes to spark-core * dynamic allocation: tests * dynamic allocation: docs * dynamic allocation: kubernetes allocator and executor accounting * dynamic allocation: shuffle service, node caching
…ogging Force commons-logging version to avoid conflicts
* dynamic allocation: shuffle service docker, yaml and test fixture * dynamic allocation: changes to spark-core * dynamic allocation: tests * dynamic allocation: docs * dynamic allocation: kubernetes allocator and executor accounting * dynamic allocation: shuffle service, node caching
Dynamic allocation updated
Please see commits individually during review for clarity
cc @mccheah @ash211 @varunkatta @apache-spark-on-k8s/contributors