diff --git a/.github/workflows/raydp.yml b/.github/workflows/raydp.yml index e57f51a2..97bb2c72 100644 --- a/.github/workflows/raydp.yml +++ b/.github/workflows/raydp.yml @@ -105,8 +105,7 @@ jobs: - name: Test with pytest run: | ray start --head --num-cpus 6 - pytest python/raydp/tests/ -v -m"not error_on_custom_resource" - pytest python/raydp/tests/ -v -m"error_on_custom_resource" + pytest python/raydp/tests/ -v ray stop --force - name: Test Examples run: | diff --git a/core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java b/core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java index da062f33..d00088c4 100644 --- a/core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java +++ b/core/raydp-main/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java @@ -1,12 +1,12 @@ package org.apache.spark.raydp; -import scala.deprecated; public class SparkOnRayConfigs { @Deprecated public static final String RAY_ACTOR_RESOURCE_PREFIX = "spark.ray.actor.resource"; - public static final String SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX = "spark.ray.raydp_spark_executor.actor.resource"; + public static final String SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX = + "spark.ray.raydp_spark_executor.actor.resource"; public static final String SPARK_MASTER_ACTOR_RESOURCE_PREFIX = "spark.ray.raydp_spark_master.actor.resource"; /** @@ -24,7 +24,8 @@ public class SparkOnRayConfigs { * This is different from spark.executor.cores, which defines the task parallelism * inside a stage. */ - public static final String SPARK_EXECUTOR_ACTOR_CPU_RESOURCE = SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX + ".cpu"; + public static final String SPARK_EXECUTOR_ACTOR_CPU_RESOURCE = + SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX + ".cpu"; public static final int DEFAULT_SPARK_CORES_PER_EXECUTOR = 1; diff --git a/doc/spark_on_ray.md b/doc/spark_on_ray.md index bc39b682..85b1b0f1 100644 --- a/doc/spark_on_ray.md +++ b/doc/spark_on_ray.md @@ -32,7 +32,7 @@ available_node_types: ### Spark executor actors node affinity Similar to master actors node affinity, you can also schedule Spark executor to a specific set of nodes -using custom resource: +using custom resource, using configuration `spark.ray.raydp_spark_executor.actor.resource.[RESOURCE_NAME]`: ```python import raydp @@ -51,7 +51,7 @@ And here is the cluster YAML with the customer resource: available_node_types: spark_on_spot: # Spark only nodes resources: - spark_executor: 100 # custom resource indicates these node group is for Spark only + spark_executor: 100 # custom resource, with name matches the one set in spark.ray.raydp_spark_executor.actor.resource.* min_workers: 2 max_workers: 10 # changing this also need to change the global max_workers node_config: @@ -76,7 +76,7 @@ spark = raydp.init_spark(app_name='RayDP Oversubscribe Example', executor_memory=1 * 1024 * 1024 * 1024, configs = { # ... - 'spark.ray.raydp_spark_executor.actor.resource.spark_executor': 1, # The actor only occupy 1 logical CPU slots from Ray + 'spark.ray.raydp_spark_executor.actor.resource.cpu': 1, # The actor only occupy 1 logical CPU slots from Ray }) ``` diff --git a/python/raydp/tests/conftest.py b/python/raydp/tests/conftest.py index 5e6fc356..c54fe587 100644 --- a/python/raydp/tests/conftest.py +++ b/python/raydp/tests/conftest.py @@ -78,7 +78,7 @@ def stop_all(): def spark_on_ray_2_executors(request): ray.shutdown() if request.param == "local": - ray.init(address="local", num_cpus=10, include_dashboard=False) + ray.init(address="local", num_cpus=6, include_dashboard=False) else: ray.init(address=request.param) node_ip = ray.util.get_node_ip_address()