Skip to content

Commit

Permalink
Implement Spark executor node affinity using custom resource (#366)
Browse files Browse the repository at this point in the history
* Implement Spark executor node affinity using custom resource

* reduce test flakeness

* Update document
  • Loading branch information
pang-wu authored and kira-lin committed Jul 28, 2023
1 parent e7c2c74 commit f8ce299
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 79 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/raydp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ jobs:
- name: Test with pytest
run: |
ray start --head --num-cpus 6
pytest python/raydp/tests/ -v -m"not error_on_custom_resource"
pytest python/raydp/tests/ -v -m"error_on_custom_resource"
pytest python/raydp/tests/ -v
ray stop --force
- name: Test Examples
run: |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.apache.spark.raydp;


public class SparkOnRayConfigs {
@Deprecated
public static final String RAY_ACTOR_RESOURCE_PREFIX = "spark.ray.actor.resource";

public static final String SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX =
"spark.ray.raydp_spark_executor.actor.resource";
public static final String SPARK_MASTER_ACTOR_RESOURCE_PREFIX =
"spark.ray.raydp_spark_master.actor.resource";
/**
Expand All @@ -10,8 +15,18 @@ public class SparkOnRayConfigs {
* This is different from spark.executor.cores, which defines the task parallelism
* inside a stage.
*/
@Deprecated
public static final String RAY_ACTOR_CPU_RESOURCE = RAY_ACTOR_RESOURCE_PREFIX + ".cpu";

/**
* CPU cores per Ray Actor which host the Spark executor, the resource is used
* for scheduling. Default value is 1.
* This is different from spark.executor.cores, which defines the task parallelism
* inside a stage.
*/
public static final String SPARK_EXECUTOR_ACTOR_CPU_RESOURCE =
SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX + ".cpu";

public static final int DEFAULT_SPARK_CORES_PER_EXECUTOR = 1;

// For below log4j related configs, there are multiple JVM processes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ class RayAppMaster(host: String,
val executorId = s"${appInfo.getNextExecutorId()}"

logInfo(s"Requesting Spark executor with Ray logical resource " +
s"{ CPU: ${rayActorCPU} }..")
s"{ CPU: ${rayActorCPU}, " +
s"${appInfo.desc.resourceReqsPerExecutor
.map{ case (name, amount) => s"${name}: ${amount}"}.mkString(", ")} }..")
// TODO: Support generic fractional logical resources using prefix spark.ray.actor.resource.*

val handler = RayExecutorUtils.createExecutorActor(
Expand All @@ -269,7 +271,8 @@ class RayAppMaster(host: String,
memory,
// This won't work, Spark expect integer in custom resources,
// please see python test test_spark_on_fractional_custom_resource
appInfo.desc.resourceReqsPerExecutor.map(pair => (pair._1, Double.box(pair._2))).asJava,
appInfo.desc.resourceReqsPerExecutor
.map{ case (name, amount) => (name, Double.box(amount))}.asJava,
placementGroup,
getNextBundleIndex,
seqAsJavaList(appInfo.desc.command.javaOpts))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.Future

import io.ray.api.{ActorHandle, Ray}

import org.apache.spark.{RayDPException, SparkConf, SparkContext}
import org.apache.spark.{RayDPException, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.raydp._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.{config, Logging}
Expand Down Expand Up @@ -167,12 +167,16 @@ class RayCoarseGrainedSchedulerBackend(

val executorResourceReqs = ResourceUtils.parseResourceRequirements(
conf, config.SPARK_EXECUTOR_PREFIX)
val resourcesInMap = transferResourceRequirements(executorResourceReqs)
val raydpExecutorCustomResources = parseRayDPResourceRequirements(conf)

val resourcesInMap = transferResourceRequirements(executorResourceReqs) ++
raydpExecutorCustomResources
val numExecutors = conf.get(config.EXECUTOR_INSTANCES).get
val sparkCoresPerExecutor = coresPerExecutor
.getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR)
val rayActorCPU = conf.get(SparkOnRayConfigs.RAY_ACTOR_CPU_RESOURCE,
sparkCoresPerExecutor.toString).toDouble
val rayActorCPU = conf.get(SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE,
conf.get(SparkOnRayConfigs.RAY_ACTOR_CPU_RESOURCE,
sparkCoresPerExecutor.toString)).toDouble

val appDesc = ApplicationDescription(name = sc.appName, numExecutors = numExecutors,
coresPerExecutor = coresPerExecutor, memoryPerExecutorMB = sc.executorMemory,
Expand All @@ -196,6 +200,21 @@ class RayCoarseGrainedSchedulerBackend(
}
}

def parseRayDPResourceRequirements(sparkConf: SparkConf): Map[String, Double] = {
sparkConf.getAllWithPrefix(
s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.")
.filter{ case (key, _) => key.toLowerCase() != "cpu" }
.map{ case (key, _) => key }
.distinct
.map(name => {
val amountDouble = sparkConf.get(
s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.${name}",
0d.toString).toDouble
name->amountDouble
})
.toMap
}

private def transferResourceRequirements(
requirements: Seq[ResourceRequirement]): HashMap[String, Double] = {
val results = HashMap[String, Double]()
Expand Down
73 changes: 66 additions & 7 deletions doc/spark_on_ray.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
### Spark master node affinity
### Spark master actors node affinity

RayDP will create a ray actor called `RayDPSparkMaster`, which will then launch the java process, acting like a Master in a tradtional Spark cluster. By default, this actor could be scheduled to any node in the ray cluster. If you want it to be on a particular node, you can assign some custom resources to that node, and request those resources when starting `RayDPSparkMaster` by setting `spark.ray.raydp_spark_master.resource.*` in `init_spark`.
RayDP will create a ray actor called `RayDPSparkMaster`, which will then launch the java process,
acting like a Master in a tradtional Spark cluster.
By default, this actor could be scheduled to any node in the ray cluster.
If you want it to be on a particular node, you can assign some custom resources to that node,
and request those resources when starting `RayDPSparkMaster` by setting
`spark.ray.raydp_spark_master.resource.*` in `init_spark`.

As an example:

```python
import raydp

raydp.init_spark(...,
configs = {
# ... other configs
'spark.ray.raydp_spark_master.actor.resource.CPU': 0,
'spark.ray.raydp_spark_master.actor.resource.spark_master': 1, # Force Spark driver related actor run on headnode
})
configs = {
# ... other configs
'spark.ray.raydp_spark_master.actor.resource.CPU': 0,
'spark.ray.raydp_spark_master.actor.resource.spark_master': 1, # Force Spark driver related actor run on headnode
})
```

In cluster config yaml:
Expand All @@ -22,6 +29,58 @@ available_node_types:
spark_master: 100 # Just gave it a large enough number so all drivers are there
```
### Spark executor actors node affinity
Similar to master actors node affinity, you can also schedule Spark executor to a specific set of nodes
using custom resource, using configuration `spark.ray.raydp_spark_executor.actor.resource.[RESOURCE_NAME]`:

```python
import raydp
spark = raydp.init_spark(...,
configs = {
# ...
'spark.ray.raydp_spark_executor.actor.resource.spark_executor': 1, # Schedule executor on nodes with custom resource spark_executor
})
```

And here is the cluster YAML with the customer resource:

```yaml
# ... other Ray cluster config
available_node_types:
spark_on_spot: # Spark only nodes
resources:
spark_executor: 100 # custom resource, with name matches the one set in spark.ray.raydp_spark_executor.actor.resource.*
min_workers: 2
max_workers: 10 # changing this also need to change the global max_workers
node_config:
# ....
general_spot: # Nodes for general Ray workloads
min_workers: 2
max_workers: 10 # changing this also need to change the global max_workers
node_config:
# ...
```

One thing worth to note is you can use `spark.ray.raydp_spark_executor.actor.resource.cpu` to oversubscribe
CPU resources, setting logical CPU smaller than the number of cores per executor. In this case, you can
schedule more executor cores than the total vCPU in a node, which is useful if your workload is not CPU bound:

```python
import raydp
spark = raydp.init_spark(app_name='RayDP Oversubscribe Example',
num_executors=1,
executor_cores=3, # The executor can run 3 tasks in parallel
executor_memory=1 * 1024 * 1024 * 1024,
configs = {
# ...
'spark.ray.raydp_spark_executor.actor.resource.cpu': 1, # The actor only occupy 1 logical CPU slots from Ray
})
```


### External Shuffle Service & Dynamic Resource Allocation

RayDP supports External Shuffle Serivce. To enable it, you can either set `spark.shuffle.service.enabled` to `true` in `spark-defaults.conf`, or you can provide a config to `raydp.init_spark`, as shown below:
Expand Down
61 changes: 12 additions & 49 deletions python/raydp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
#

import logging
import subprocess
import time

import pytest
import pyspark
from pyspark.sql import SparkSession
import pytest
import ray
import raydp
import subprocess
from ray.cluster_utils import Cluster
from pyspark.sql import SparkSession


def quiet_logger():
Expand Down Expand Up @@ -60,18 +60,20 @@ def spark_on_ray_small(request):
else:
ray.init(address=request.param)
node_ip = ray.util.get_node_ip_address()
spark = raydp.init_spark("test", 1, 1, "500M", configs= {
spark = raydp.init_spark("test", 1, 1, "500M", configs={
"spark.driver.host": node_ip,
"spark.driver.bindAddress": node_ip
})

def stop_all():
raydp.stop_spark()
time.sleep(5)
ray.shutdown()

request.addfinalizer(stop_all)
return spark


@pytest.fixture(scope="function", params=["local", "ray://localhost:10001"])
def spark_on_ray_2_executors(request):
ray.shutdown()
Expand All @@ -80,59 +82,19 @@ def spark_on_ray_2_executors(request):
else:
ray.init(address=request.param)
node_ip = ray.util.get_node_ip_address()
spark = raydp.init_spark("test", 2, 1, "500M", configs= {
spark = raydp.init_spark("test", 2, 1, "500M", configs={
"spark.driver.host": node_ip,
"spark.driver.bindAddress": node_ip
})

def stop_all():
raydp.stop_spark()
time.sleep(5)
ray.shutdown()

request.addfinalizer(stop_all)
return spark


@pytest.fixture(scope="function")
def spark_on_ray_fraction_custom_resource(request):
ray.shutdown()
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 2
})
ray.init(address=cluster.address)

def stop_all():
raydp.stop_spark()
ray.shutdown()

request.addfinalizer(stop_all)


@pytest.fixture(scope="function")
def spark_on_ray_fractional_cpu(request):
ray.shutdown()
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 2
})

ray.init(address=cluster.address)

spark = raydp.init_spark(app_name="test_cpu_fraction",
num_executors=1, executor_cores=3, executor_memory="500M",
configs={"spark.ray.actor.resource.cpu": "0.1"})

def stop_all():
raydp.stop_spark()
ray.shutdown()

request.addfinalizer(stop_all)
return spark


@pytest.fixture(scope='session')
def custom_spark_dir(tmp_path_factory) -> str:
working_dir = tmp_path_factory.mktemp("spark").as_posix()
Expand All @@ -153,7 +115,8 @@ def custom_spark_dir(tmp_path_factory) -> str:

import wget

wget.download(f"https://archive.apache.org/dist/spark/spark-{pyspark.__version__}/{spark_distribution}.{file_extension}",
spark_distribution_file)
wget.download(
f"https://archive.apache.org/dist/spark/spark-{pyspark.__version__}/{spark_distribution}.{file_extension}",
spark_distribution_file)
subprocess.check_output(['tar', 'xzvf', spark_distribution_file, '--directory', working_dir])
return f"{working_dir}/{spark_distribution}"
Loading

0 comments on commit f8ce299

Please sign in to comment.