diff --git a/.github/workflows/raydp.yml b/.github/workflows/raydp.yml index e57f51a2..97bb2c72 100644 --- a/.github/workflows/raydp.yml +++ b/.github/workflows/raydp.yml @@ -105,8 +105,7 @@ jobs: - name: Test with pytest run: | ray start --head --num-cpus 6 - pytest python/raydp/tests/ -v -m"not error_on_custom_resource" - pytest python/raydp/tests/ -v -m"error_on_custom_resource" + pytest python/raydp/tests/ -v ray stop --force - name: Test Examples run: | diff --git a/core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java b/core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java index 1aa75cf5..d00088c4 100644 --- a/core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java +++ b/core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java @@ -1,7 +1,12 @@ package org.apache.spark.raydp; + public class SparkOnRayConfigs { + @Deprecated public static final String RAY_ACTOR_RESOURCE_PREFIX = "spark.ray.actor.resource"; + + public static final String SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX = + "spark.ray.raydp_spark_executor.actor.resource"; public static final String SPARK_MASTER_ACTOR_RESOURCE_PREFIX = "spark.ray.raydp_spark_master.actor.resource"; /** @@ -10,8 +15,18 @@ public class SparkOnRayConfigs { * This is different from spark.executor.cores, which defines the task parallelism * inside a stage. */ + @Deprecated public static final String RAY_ACTOR_CPU_RESOURCE = RAY_ACTOR_RESOURCE_PREFIX + ".cpu"; + /** + * CPU cores per Ray Actor which host the Spark executor, the resource is used + * for scheduling. Default value is 1. + * This is different from spark.executor.cores, which defines the task parallelism + * inside a stage. + */ + public static final String SPARK_EXECUTOR_ACTOR_CPU_RESOURCE = + SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX + ".cpu"; + public static final int DEFAULT_SPARK_CORES_PER_EXECUTOR = 1; // For below log4j related configs, there are multiple JVM processes diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index 73a86ec8..99b79a05 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -260,7 +260,9 @@ class RayAppMaster(host: String, val executorId = s"${appInfo.getNextExecutorId()}" logInfo(s"Requesting Spark executor with Ray logical resource " + - s"{ CPU: ${rayActorCPU} }..") + s"{ CPU: ${rayActorCPU}, " + + s"${appInfo.desc.resourceReqsPerExecutor + .map{ case (name, amount) => s"${name}: ${amount}"}.mkString(", ")} }..") // TODO: Support generic fractional logical resources using prefix spark.ray.actor.resource.* val handler = RayExecutorUtils.createExecutorActor( @@ -269,7 +271,8 @@ class RayAppMaster(host: String, memory, // This won't work, Spark expect integer in custom resources, // please see python test test_spark_on_fractional_custom_resource - appInfo.desc.resourceReqsPerExecutor.map(pair => (pair._1, Double.box(pair._2))).asJava, + appInfo.desc.resourceReqsPerExecutor + .map{ case (name, amount) => (name, Double.box(amount))}.asJava, placementGroup, getNextBundleIndex, seqAsJavaList(appInfo.desc.command.javaOpts)) diff --git a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala index ff42be10..b5ecb7f9 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala @@ -27,7 +27,7 @@ import scala.concurrent.Future import io.ray.api.{ActorHandle, Ray} -import org.apache.spark.{RayDPException, SparkConf, SparkContext} +import org.apache.spark.{RayDPException, SparkConf, SparkContext, SparkException} import org.apache.spark.deploy.raydp._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{config, Logging} @@ -167,12 +167,16 @@ class RayCoarseGrainedSchedulerBackend( val executorResourceReqs = ResourceUtils.parseResourceRequirements( conf, config.SPARK_EXECUTOR_PREFIX) - val resourcesInMap = transferResourceRequirements(executorResourceReqs) + val raydpExecutorCustomResources = parseRayDPResourceRequirements(conf) + + val resourcesInMap = transferResourceRequirements(executorResourceReqs) ++ + raydpExecutorCustomResources val numExecutors = conf.get(config.EXECUTOR_INSTANCES).get val sparkCoresPerExecutor = coresPerExecutor .getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR) - val rayActorCPU = conf.get(SparkOnRayConfigs.RAY_ACTOR_CPU_RESOURCE, - sparkCoresPerExecutor.toString).toDouble + val rayActorCPU = conf.get(SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE, + conf.get(SparkOnRayConfigs.RAY_ACTOR_CPU_RESOURCE, + sparkCoresPerExecutor.toString)).toDouble val appDesc = ApplicationDescription(name = sc.appName, numExecutors = numExecutors, coresPerExecutor = coresPerExecutor, memoryPerExecutorMB = sc.executorMemory, @@ -196,6 +200,21 @@ class RayCoarseGrainedSchedulerBackend( } } + def parseRayDPResourceRequirements(sparkConf: SparkConf): Map[String, Double] = { + sparkConf.getAllWithPrefix( + s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.") + .filter{ case (key, _) => key.toLowerCase() != "cpu" } + .map{ case (key, _) => key } + .distinct + .map(name => { + val amountDouble = sparkConf.get( + s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.${name}", + 0d.toString).toDouble + name->amountDouble + }) + .toMap + } + private def transferResourceRequirements( requirements: Seq[ResourceRequirement]): HashMap[String, Double] = { val results = HashMap[String, Double]() diff --git a/doc/spark_on_ray.md b/doc/spark_on_ray.md index 4622e28d..85b1b0f1 100644 --- a/doc/spark_on_ray.md +++ b/doc/spark_on_ray.md @@ -1,16 +1,23 @@ -### Spark master node affinity +### Spark master actors node affinity -RayDP will create a ray actor called `RayDPSparkMaster`, which will then launch the java process, acting like a Master in a tradtional Spark cluster. By default, this actor could be scheduled to any node in the ray cluster. If you want it to be on a particular node, you can assign some custom resources to that node, and request those resources when starting `RayDPSparkMaster` by setting `spark.ray.raydp_spark_master.resource.*` in `init_spark`. +RayDP will create a ray actor called `RayDPSparkMaster`, which will then launch the java process, +acting like a Master in a tradtional Spark cluster. +By default, this actor could be scheduled to any node in the ray cluster. +If you want it to be on a particular node, you can assign some custom resources to that node, +and request those resources when starting `RayDPSparkMaster` by setting +`spark.ray.raydp_spark_master.resource.*` in `init_spark`. As an example: ```python +import raydp + raydp.init_spark(..., -configs = { - # ... other configs - 'spark.ray.raydp_spark_master.actor.resource.CPU': 0, - 'spark.ray.raydp_spark_master.actor.resource.spark_master': 1, # Force Spark driver related actor run on headnode -}) + configs = { + # ... other configs + 'spark.ray.raydp_spark_master.actor.resource.CPU': 0, + 'spark.ray.raydp_spark_master.actor.resource.spark_master': 1, # Force Spark driver related actor run on headnode + }) ``` In cluster config yaml: @@ -22,6 +29,58 @@ available_node_types: spark_master: 100 # Just gave it a large enough number so all drivers are there ``` +### Spark executor actors node affinity + +Similar to master actors node affinity, you can also schedule Spark executor to a specific set of nodes +using custom resource, using configuration `spark.ray.raydp_spark_executor.actor.resource.[RESOURCE_NAME]`: + +```python +import raydp + +spark = raydp.init_spark(..., + configs = { + # ... + 'spark.ray.raydp_spark_executor.actor.resource.spark_executor': 1, # Schedule executor on nodes with custom resource spark_executor + }) +``` + +And here is the cluster YAML with the customer resource: + +```yaml +# ... other Ray cluster config +available_node_types: + spark_on_spot: # Spark only nodes + resources: + spark_executor: 100 # custom resource, with name matches the one set in spark.ray.raydp_spark_executor.actor.resource.* + min_workers: 2 + max_workers: 10 # changing this also need to change the global max_workers + node_config: + # .... + general_spot: # Nodes for general Ray workloads + min_workers: 2 + max_workers: 10 # changing this also need to change the global max_workers + node_config: + # ... +``` + +One thing worth to note is you can use `spark.ray.raydp_spark_executor.actor.resource.cpu` to oversubscribe +CPU resources, setting logical CPU smaller than the number of cores per executor. In this case, you can +schedule more executor cores than the total vCPU in a node, which is useful if your workload is not CPU bound: + +```python +import raydp + +spark = raydp.init_spark(app_name='RayDP Oversubscribe Example', + num_executors=1, + executor_cores=3, # The executor can run 3 tasks in parallel + executor_memory=1 * 1024 * 1024 * 1024, + configs = { + # ... + 'spark.ray.raydp_spark_executor.actor.resource.cpu': 1, # The actor only occupy 1 logical CPU slots from Ray + }) +``` + + ### External Shuffle Service & Dynamic Resource Allocation RayDP supports External Shuffle Serivce. To enable it, you can either set `spark.shuffle.service.enabled` to `true` in `spark-defaults.conf`, or you can provide a config to `raydp.init_spark`, as shown below: diff --git a/python/raydp/tests/conftest.py b/python/raydp/tests/conftest.py index 4707faea..c54fe587 100644 --- a/python/raydp/tests/conftest.py +++ b/python/raydp/tests/conftest.py @@ -16,14 +16,14 @@ # import logging +import subprocess +import time -import pytest import pyspark -from pyspark.sql import SparkSession +import pytest import ray import raydp -import subprocess -from ray.cluster_utils import Cluster +from pyspark.sql import SparkSession def quiet_logger(): @@ -60,18 +60,20 @@ def spark_on_ray_small(request): else: ray.init(address=request.param) node_ip = ray.util.get_node_ip_address() - spark = raydp.init_spark("test", 1, 1, "500M", configs= { + spark = raydp.init_spark("test", 1, 1, "500M", configs={ "spark.driver.host": node_ip, "spark.driver.bindAddress": node_ip }) def stop_all(): raydp.stop_spark() + time.sleep(5) ray.shutdown() request.addfinalizer(stop_all) return spark + @pytest.fixture(scope="function", params=["local", "ray://localhost:10001"]) def spark_on_ray_2_executors(request): ray.shutdown() @@ -80,59 +82,19 @@ def spark_on_ray_2_executors(request): else: ray.init(address=request.param) node_ip = ray.util.get_node_ip_address() - spark = raydp.init_spark("test", 2, 1, "500M", configs= { + spark = raydp.init_spark("test", 2, 1, "500M", configs={ "spark.driver.host": node_ip, "spark.driver.bindAddress": node_ip }) def stop_all(): raydp.stop_spark() + time.sleep(5) ray.shutdown() request.addfinalizer(stop_all) return spark - -@pytest.fixture(scope="function") -def spark_on_ray_fraction_custom_resource(request): - ray.shutdown() - cluster = Cluster( - initialize_head=True, - head_node_args={ - "num_cpus": 2 - }) - ray.init(address=cluster.address) - - def stop_all(): - raydp.stop_spark() - ray.shutdown() - - request.addfinalizer(stop_all) - - -@pytest.fixture(scope="function") -def spark_on_ray_fractional_cpu(request): - ray.shutdown() - cluster = Cluster( - initialize_head=True, - head_node_args={ - "num_cpus": 2 - }) - - ray.init(address=cluster.address) - - spark = raydp.init_spark(app_name="test_cpu_fraction", - num_executors=1, executor_cores=3, executor_memory="500M", - configs={"spark.ray.actor.resource.cpu": "0.1"}) - - def stop_all(): - raydp.stop_spark() - ray.shutdown() - - request.addfinalizer(stop_all) - return spark - - @pytest.fixture(scope='session') def custom_spark_dir(tmp_path_factory) -> str: working_dir = tmp_path_factory.mktemp("spark").as_posix() @@ -153,7 +115,8 @@ def custom_spark_dir(tmp_path_factory) -> str: import wget - wget.download(f"https://archive.apache.org/dist/spark/spark-{pyspark.__version__}/{spark_distribution}.{file_extension}", - spark_distribution_file) + wget.download( + f"https://archive.apache.org/dist/spark/spark-{pyspark.__version__}/{spark_distribution}.{file_extension}", + spark_distribution_file) subprocess.check_output(['tar', 'xzvf', spark_distribution_file, '--directory', working_dir]) return f"{working_dir}/{spark_distribution}" diff --git a/python/raydp/tests/test_spark_cluster.py b/python/raydp/tests/test_spark_cluster.py index bd514470..16040881 100644 --- a/python/raydp/tests/test_spark_cluster.py +++ b/python/raydp/tests/test_spark_cluster.py @@ -39,24 +39,67 @@ def test_spark(spark_on_ray_small): assert result == 10 -def test_spark_on_fractional_cpu(spark_on_ray_fractional_cpu): - spark = spark_on_ray_fractional_cpu +def test_legacy_spark_on_fractional_cpu(): + cluster = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "num_cpus": 2 + }) + + spark = raydp.init_spark(app_name="test_cpu_fraction", + num_executors=1, executor_cores=3, executor_memory="500M", + configs={"spark.ray.actor.resource.cpu": "0.1"}) result = spark.range(0, 10).count() assert result == 10 + spark.stop() + raydp.stop_spark() + time.sleep(5) + ray.shutdown() + cluster.shutdown() -@pytest.mark.error_on_custom_resource -def test_spark_on_fractional_custom_resource(spark_on_ray_fraction_custom_resource): - try: - spark = raydp.init_spark(app_name="test_custom_resource_fraction", - num_executors=1, executor_cores=3, executor_memory="500M", - configs={"spark.executor.resource.CUSTOM.amount": "0.1"}) - spark.range(0, 10).count() - except Exception: - assert True - return - assert False +def test_spark_on_fractional_cpu(): + cluster = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "num_cpus": 2 + }) + + spark = raydp.init_spark(app_name="test_cpu_fraction", + num_executors=1, executor_cores=3, executor_memory="500M", + configs={"spark.ray.raydp_spark_executor.actor.resource.cpu": "0.1"}) + result = spark.range(0, 10).count() + assert result == 10 + + spark.stop() + raydp.stop_spark() + time.sleep(5) + ray.shutdown() + cluster.shutdown() + + +def test_spark_executor_node_affinity(): + cluster = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "num_cpus": 1, + }) + cluster.add_node(num_cpus=2, resources={"spark_executor": 10}) + + spark = raydp.init_spark(app_name="test_executor_node_affinity", + num_executors=1, executor_cores=2, executor_memory="500M", + configs={"spark.ray.raydp_spark_executor.actor.resource.spark_executor": "1"}) + result = spark.range(0, 10).count() + assert result == 10 + + raydp.stop_spark() + time.sleep(5) + ray.shutdown() + cluster.shutdown() def test_spark_remote(ray_cluster): @@ -74,6 +117,7 @@ def run(self): def stop(self): self.spark.stop() raydp.stop_spark() + time.sleep(5) driver = SparkRemote.remote() result = ray.get(driver.run.remote()) @@ -171,6 +215,7 @@ def test_placement_group(ray_cluster): ]) assert num_non_removed_pgs == 0 + def test_reconstruction(): cluster = ray.cluster_utils.Cluster() # Head node has 2 cores for necessray actors @@ -183,8 +228,8 @@ def test_reconstruction(): # init_spark before adding nodes to ensure drivers connect to the head node spark = raydp.init_spark('a', 2, 1, '500m', fault_tolerant_mode=True) # Add two nodes, 1 executor each - node_to_kill = cluster.add_node(num_cpus=1, include_dashboard=False,object_store_memory=10**8) - second_node = cluster.add_node(num_cpus=1, include_dashboard=False,object_store_memory=10**8) + node_to_kill = cluster.add_node(num_cpus=1, include_dashboard=False, object_store_memory=10 ** 8) + second_node = cluster.add_node(num_cpus=1, include_dashboard=False, object_store_memory=10 ** 8) # wait for executors to start time.sleep(5) # df should be large enough so that result will be put into plasma @@ -203,6 +248,7 @@ def test_reconstruction(): ray.shutdown() cluster.shutdown() + @pytest.mark.skip("flaky") def test_custom_installed_spark(custom_spark_dir): os.environ["SPARK_HOME"] = custom_spark_dir @@ -225,6 +271,7 @@ def test_custom_installed_spark(custom_spark_dir): assert result == 10 assert spark_home == custom_spark_dir + def start_spark(barrier, i, results): try: # connect to the cluster started before pytest @@ -240,6 +287,7 @@ def start_spark(barrier, i, results): except Exception as e: results[i] = -1 + def test_init_spark_twice(): num_processes = 2 ctx = get_context("spawn") @@ -256,5 +304,6 @@ def test_init_spark_twice(): assert results[0] == 10 assert results[1] == 10 + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__]))