From fb9f4305fdffccb2d6fd1a8b50f2210aeb697031 Mon Sep 17 00:00:00 2001 From: Maziyar Panahi Date: Tue, 18 Jul 2023 11:10:35 +0000 Subject: [PATCH 1/2] Allow users to change Driver's cores at start --- python/sparknlp/__init__.py | 13 ++++++++++++- src/main/scala/com/johnsnowlabs/nlp/SparkNLP.scala | 8 +++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/python/sparknlp/__init__.py b/python/sparknlp/__init__.py index 5eca3c05193f9f..5685670314fcb8 100644 --- a/python/sparknlp/__init__.py +++ b/python/sparknlp/__init__.py @@ -132,13 +132,24 @@ def start(gpu=False, if params is None: params = {} + else: + if not isinstance(params, dict): + raise TypeError('params must be a dictionary like {"spark.executor.memory": "8G"}') + if '_instantiatedSession' in dir(SparkSession) and SparkSession._instantiatedSession is not None: print('Warning::Spark Session already created, some configs may not take.') + driver_cores = "*" + for key, value in params.items(): + if key == "spark.driver.cores": + driver_cores = f"{value}" + else: + driver_cores = "*" + class SparkNLPConfig: def __init__(self): - self.master, self.app_name = "local[*]", "Spark NLP" + self.master, self.app_name = "local[{}]".format(driver_cores), "Spark NLP" self.serializer, self.serializer_max_buffer = "org.apache.spark.serializer.KryoSerializer", "2000M" self.driver_max_result_size = "0" # Spark NLP on CPU or GPU diff --git a/src/main/scala/com/johnsnowlabs/nlp/SparkNLP.scala b/src/main/scala/com/johnsnowlabs/nlp/SparkNLP.scala index 12904be9b6bd97..73dbedef2002cc 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/SparkNLP.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/SparkNLP.scala @@ -68,12 +68,18 @@ object SparkNLP { val builder = SparkSession .builder() .appName("Spark NLP") - .master("local[*]") .config("spark.driver.memory", memory) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer.max", "2000M") .config("spark.driver.maxResultSize", "0") + // get the set cores by users since local[*] will override spark.driver.cores if set + if (params.contains("spark.driver.cores")) { + builder.master("local[" + params("spark.driver.cores") + "]") + } else { + builder.master("local[*]") + } + val sparkNlpJar = if (apple_silicon) MavenSparkSilicon else if (aarch64) MavenSparkAarch64 From eac9cb01ccff37471a18a4c37291f766ec4ebb7c Mon Sep 17 00:00:00 2001 From: Maziyar Panahi Date: Tue, 18 Jul 2023 11:11:02 +0000 Subject: [PATCH 2/2] Add unit test for driver cores in start function --- .../scala/com/johnsnowlabs/nlp/SparkNLPTestSpec.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/test/scala/com/johnsnowlabs/nlp/SparkNLPTestSpec.scala b/src/test/scala/com/johnsnowlabs/nlp/SparkNLPTestSpec.scala index 8fa879b07bc184..69d7e7181ca160 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/SparkNLPTestSpec.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/SparkNLPTestSpec.scala @@ -1,6 +1,6 @@ package com.johnsnowlabs.nlp -import com.johnsnowlabs.tags.SlowTest +import com.johnsnowlabs.tags.FastTest import com.johnsnowlabs.util.ConfigHelper.{awsJavaSdkVersion, hadoopAwsVersion} import org.scalatest.flatspec.AnyFlatSpec @@ -8,14 +8,16 @@ class SparkNLPTestSpec extends AnyFlatSpec { behavior of "SparkNLPTestSpec" - it should "start with extra parameters" taggedAs SlowTest ignore { + it should "start with extra parameters" taggedAs FastTest in { val extraParams: Map[String, String] = Map( "spark.jars.packages" -> ("org.apache.hadoop:hadoop-aws:" + hadoopAwsVersion + ",com.amazonaws:aws-java-sdk:" + awsJavaSdkVersion), - "spark.hadoop.fs.s3a.path.style.access" -> "true") + "spark.hadoop.fs.s3a.path.style.access" -> "true", + "spark.driver.cores" -> "2") val spark = SparkNLP.start(params = extraParams) assert(spark.conf.get("spark.hadoop.fs.s3a.path.style.access") == "true") + assert(spark.conf.get("spark.master") == "local[2]") Seq( "com.johnsnowlabs.nlp:spark-nlp",