Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Spark executor node affinity using custom resource #366

Merged
merged 3 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/raydp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ jobs:
- name: Test with pytest
run: |
ray start --head --num-cpus 6
pytest python/raydp/tests/ -v -m"not error_on_custom_resource"
pytest python/raydp/tests/ -v -m"error_on_custom_resource"
pytest python/raydp/tests/ -v
ray stop --force
- name: Test Examples
run: |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.apache.spark.raydp;


public class SparkOnRayConfigs {
@Deprecated
public static final String RAY_ACTOR_RESOURCE_PREFIX = "spark.ray.actor.resource";

public static final String SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX =
"spark.ray.raydp_spark_executor.actor.resource";
public static final String SPARK_MASTER_ACTOR_RESOURCE_PREFIX =
"spark.ray.raydp_spark_master.actor.resource";
/**
Expand All @@ -10,8 +15,18 @@ public class SparkOnRayConfigs {
* This is different from spark.executor.cores, which defines the task parallelism
* inside a stage.
*/
@Deprecated
public static final String RAY_ACTOR_CPU_RESOURCE = RAY_ACTOR_RESOURCE_PREFIX + ".cpu";

/**
* CPU cores per Ray Actor which host the Spark executor, the resource is used
* for scheduling. Default value is 1.
* This is different from spark.executor.cores, which defines the task parallelism
* inside a stage.
*/
public static final String SPARK_EXECUTOR_ACTOR_CPU_RESOURCE =
SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX + ".cpu";

public static final int DEFAULT_SPARK_CORES_PER_EXECUTOR = 1;

// For below log4j related configs, there are multiple JVM processes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ class RayAppMaster(host: String,
val executorId = s"${appInfo.getNextExecutorId()}"

logInfo(s"Requesting Spark executor with Ray logical resource " +
s"{ CPU: ${rayActorCPU} }..")
s"{ CPU: ${rayActorCPU}, " +
s"${appInfo.desc.resourceReqsPerExecutor
.map{ case (name, amount) => s"${name}: ${amount}"}.mkString(", ")} }..")
// TODO: Support generic fractional logical resources using prefix spark.ray.actor.resource.*

val handler = RayExecutorUtils.createExecutorActor(
Expand All @@ -269,7 +271,8 @@ class RayAppMaster(host: String,
memory,
// This won't work, Spark expect integer in custom resources,
// please see python test test_spark_on_fractional_custom_resource
appInfo.desc.resourceReqsPerExecutor.map(pair => (pair._1, Double.box(pair._2))).asJava,
appInfo.desc.resourceReqsPerExecutor
.map{ case (name, amount) => (name, Double.box(amount))}.asJava,
placementGroup,
getNextBundleIndex,
seqAsJavaList(appInfo.desc.command.javaOpts))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.Future

import io.ray.api.{ActorHandle, Ray}

import org.apache.spark.{RayDPException, SparkConf, SparkContext}
import org.apache.spark.{RayDPException, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.raydp._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.{config, Logging}
Expand Down Expand Up @@ -167,12 +167,16 @@ class RayCoarseGrainedSchedulerBackend(

val executorResourceReqs = ResourceUtils.parseResourceRequirements(
conf, config.SPARK_EXECUTOR_PREFIX)
val resourcesInMap = transferResourceRequirements(executorResourceReqs)
val raydpExecutorCustomResources = parseRayDPResourceRequirements(conf)

val resourcesInMap = transferResourceRequirements(executorResourceReqs) ++
raydpExecutorCustomResources
val numExecutors = conf.get(config.EXECUTOR_INSTANCES).get
val sparkCoresPerExecutor = coresPerExecutor
.getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR)
val rayActorCPU = conf.get(SparkOnRayConfigs.RAY_ACTOR_CPU_RESOURCE,
sparkCoresPerExecutor.toString).toDouble
val rayActorCPU = conf.get(SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE,
conf.get(SparkOnRayConfigs.RAY_ACTOR_CPU_RESOURCE,
sparkCoresPerExecutor.toString)).toDouble

val appDesc = ApplicationDescription(name = sc.appName, numExecutors = numExecutors,
coresPerExecutor = coresPerExecutor, memoryPerExecutorMB = sc.executorMemory,
Expand All @@ -196,6 +200,21 @@ class RayCoarseGrainedSchedulerBackend(
}
}

def parseRayDPResourceRequirements(sparkConf: SparkConf): Map[String, Double] = {
sparkConf.getAllWithPrefix(
s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.")
.filter{ case (key, _) => key.toLowerCase() != "cpu" }
.map{ case (key, _) => key }
.distinct
.map(name => {
val amountDouble = sparkConf.get(
s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.${name}",
0d.toString).toDouble
name->amountDouble
})
.toMap
}

private def transferResourceRequirements(
requirements: Seq[ResourceRequirement]): HashMap[String, Double] = {
val results = HashMap[String, Double]()
Expand Down
73 changes: 66 additions & 7 deletions doc/spark_on_ray.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
### Spark master node affinity
### Spark master actors node affinity

RayDP will create a ray actor called `RayDPSparkMaster`, which will then launch the java process, acting like a Master in a tradtional Spark cluster. By default, this actor could be scheduled to any node in the ray cluster. If you want it to be on a particular node, you can assign some custom resources to that node, and request those resources when starting `RayDPSparkMaster` by setting `spark.ray.raydp_spark_master.resource.*` in `init_spark`.
RayDP will create a ray actor called `RayDPSparkMaster`, which will then launch the java process,
acting like a Master in a tradtional Spark cluster.
By default, this actor could be scheduled to any node in the ray cluster.
If you want it to be on a particular node, you can assign some custom resources to that node,
and request those resources when starting `RayDPSparkMaster` by setting
`spark.ray.raydp_spark_master.resource.*` in `init_spark`.

As an example:

```python
import raydp

raydp.init_spark(...,
configs = {
# ... other configs
'spark.ray.raydp_spark_master.actor.resource.CPU': 0,
'spark.ray.raydp_spark_master.actor.resource.spark_master': 1, # Force Spark driver related actor run on headnode
})
configs = {
# ... other configs
'spark.ray.raydp_spark_master.actor.resource.CPU': 0,
'spark.ray.raydp_spark_master.actor.resource.spark_master': 1, # Force Spark driver related actor run on headnode
})
```

In cluster config yaml:
Expand All @@ -22,6 +29,58 @@ available_node_types:
spark_master: 100 # Just gave it a large enough number so all drivers are there
```

### Spark executor actors node affinity

Similar to master actors node affinity, you can also schedule Spark executor to a specific set of nodes
using custom resource, using configuration `spark.ray.raydp_spark_executor.actor.resource.[RESOURCE_NAME]`:

```python
import raydp

spark = raydp.init_spark(...,
configs = {
# ...
'spark.ray.raydp_spark_executor.actor.resource.spark_executor': 1, # Schedule executor on nodes with custom resource spark_executor
})
```

And here is the cluster YAML with the customer resource:

```yaml
# ... other Ray cluster config
available_node_types:
spark_on_spot: # Spark only nodes
resources:
spark_executor: 100 # custom resource, with name matches the one set in spark.ray.raydp_spark_executor.actor.resource.*
min_workers: 2
max_workers: 10 # changing this also need to change the global max_workers
node_config:
# ....
general_spot: # Nodes for general Ray workloads
min_workers: 2
max_workers: 10 # changing this also need to change the global max_workers
node_config:
# ...
```

One thing worth to note is you can use `spark.ray.raydp_spark_executor.actor.resource.cpu` to oversubscribe
CPU resources, setting logical CPU smaller than the number of cores per executor. In this case, you can
schedule more executor cores than the total vCPU in a node, which is useful if your workload is not CPU bound:

```python
import raydp

spark = raydp.init_spark(app_name='RayDP Oversubscribe Example',
num_executors=1,
executor_cores=3, # The executor can run 3 tasks in parallel
executor_memory=1 * 1024 * 1024 * 1024,
configs = {
# ...
'spark.ray.raydp_spark_executor.actor.resource.cpu': 1, # The actor only occupy 1 logical CPU slots from Ray
})
```


### External Shuffle Service & Dynamic Resource Allocation

RayDP supports External Shuffle Serivce. To enable it, you can either set `spark.shuffle.service.enabled` to `true` in `spark-defaults.conf`, or you can provide a config to `raydp.init_spark`, as shown below:
Expand Down
61 changes: 12 additions & 49 deletions python/raydp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
#

import logging
import subprocess
import time

import pytest
import pyspark
from pyspark.sql import SparkSession
import pytest
import ray
import raydp
import subprocess
from ray.cluster_utils import Cluster
from pyspark.sql import SparkSession


def quiet_logger():
Expand Down Expand Up @@ -60,18 +60,20 @@ def spark_on_ray_small(request):
else:
ray.init(address=request.param)
node_ip = ray.util.get_node_ip_address()
spark = raydp.init_spark("test", 1, 1, "500M", configs= {
spark = raydp.init_spark("test", 1, 1, "500M", configs={
"spark.driver.host": node_ip,
"spark.driver.bindAddress": node_ip
})

def stop_all():
raydp.stop_spark()
time.sleep(5)
ray.shutdown()

request.addfinalizer(stop_all)
return spark


@pytest.fixture(scope="function", params=["local", "ray://localhost:10001"])
def spark_on_ray_2_executors(request):
ray.shutdown()
Expand All @@ -80,59 +82,19 @@ def spark_on_ray_2_executors(request):
else:
ray.init(address=request.param)
node_ip = ray.util.get_node_ip_address()
spark = raydp.init_spark("test", 2, 1, "500M", configs= {
spark = raydp.init_spark("test", 2, 1, "500M", configs={
"spark.driver.host": node_ip,
"spark.driver.bindAddress": node_ip
})

def stop_all():
raydp.stop_spark()
time.sleep(5)
ray.shutdown()

request.addfinalizer(stop_all)
return spark


@pytest.fixture(scope="function")
def spark_on_ray_fraction_custom_resource(request):
ray.shutdown()
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 2
})
ray.init(address=cluster.address)

def stop_all():
raydp.stop_spark()
ray.shutdown()

request.addfinalizer(stop_all)


@pytest.fixture(scope="function")
def spark_on_ray_fractional_cpu(request):
ray.shutdown()
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 2
})

ray.init(address=cluster.address)

spark = raydp.init_spark(app_name="test_cpu_fraction",
num_executors=1, executor_cores=3, executor_memory="500M",
configs={"spark.ray.actor.resource.cpu": "0.1"})

def stop_all():
raydp.stop_spark()
ray.shutdown()

request.addfinalizer(stop_all)
return spark


@pytest.fixture(scope='session')
def custom_spark_dir(tmp_path_factory) -> str:
working_dir = tmp_path_factory.mktemp("spark").as_posix()
Expand All @@ -153,7 +115,8 @@ def custom_spark_dir(tmp_path_factory) -> str:

import wget

wget.download(f"https://archive.apache.org/dist/spark/spark-{pyspark.__version__}/{spark_distribution}.{file_extension}",
spark_distribution_file)
wget.download(
f"https://archive.apache.org/dist/spark/spark-{pyspark.__version__}/{spark_distribution}.{file_extension}",
spark_distribution_file)
subprocess.check_output(['tar', 'xzvf', spark_distribution_file, '--directory', working_dir])
return f"{working_dir}/{spark_distribution}"
Loading