From 454463bd12f1e7ebde675cd7f6952f7f45750b0c Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 22 Jan 2025 16:05:29 +0800 Subject: [PATCH] [SPARK-50879][ML][PYTHON][CONNECT][FOLLOW-UP] Support RobustScaler on Connect ### What changes were proposed in this pull request? Support RobustScaler on Connect ### Why are the changes needed? feature parity ### Does this PR introduce _any_ user-facing change? yes, new feature ### How was this patch tested? added test ### Was this patch authored or co-authored using generative AI tooling? no Closes #49597 from zhengruifeng/ml_connect_robust_scaler. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../services/org.apache.spark.ml.Estimator | 1 + .../services/org.apache.spark.ml.Transformer | 1 + .../spark/ml/feature/RobustScaler.scala | 2 + python/pyspark/ml/tests/test_feature.py | 47 +++++++++++++++++-- .../apache/spark/sql/connect/ml/MLUtils.scala | 2 + 5 files changed, 50 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator index 23f70521214d0..6221c61a06709 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator @@ -49,3 +49,4 @@ org.apache.spark.ml.fpm.FPGrowth org.apache.spark.ml.feature.StandardScaler org.apache.spark.ml.feature.MaxAbsScaler org.apache.spark.ml.feature.MinMaxScaler +org.apache.spark.ml.feature.RobustScaler diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer index a25c03ed2b8e0..47e9f4b79e346 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer @@ -47,3 +47,4 @@ org.apache.spark.ml.fpm.FPGrowthModel org.apache.spark.ml.feature.StandardScalerModel org.apache.spark.ml.feature.MaxAbsScalerModel org.apache.spark.ml.feature.MinMaxScalerModel +org.apache.spark.ml.feature.RobustScalerModel diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala index f3e068f049205..c77f7008d05a3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala @@ -230,6 +230,8 @@ class RobustScalerModel private[ml] ( import RobustScalerModel._ + private[ml] def this() = this(Identifiable.randomUID("robustScal"), Vectors.empty, Vectors.empty) + /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) diff --git a/python/pyspark/ml/tests/test_feature.py b/python/pyspark/ml/tests/test_feature.py index 51c7a3631e1b1..dba4afc213bad 100644 --- a/python/pyspark/ml/tests/test_feature.py +++ b/python/pyspark/ml/tests/test_feature.py @@ -35,6 +35,8 @@ MaxAbsScalerModel, MinMaxScaler, MinMaxScalerModel, + RobustScaler, + RobustScalerModel, StopWordsRemover, StringIndexer, StringIndexerModel, @@ -103,7 +105,7 @@ def test_standard_scaler(self): ["index", "weight", "features"], ) .coalesce(1) - .sortWithinPartitions("weight") + .sortWithinPartitions("index") .select("features") ) scaler = StandardScaler(inputCol="features", outputCol="scaled") @@ -141,7 +143,7 @@ def test_maxabs_scaler(self): ["index", "weight", "features"], ) .coalesce(1) - .sortWithinPartitions("weight") + .sortWithinPartitions("index") .select("features") ) @@ -179,7 +181,7 @@ def test_minmax_scaler(self): ["index", "weight", "features"], ) .coalesce(1) - .sortWithinPartitions("weight") + .sortWithinPartitions("index") .select("features") ) @@ -207,6 +209,45 @@ def test_minmax_scaler(self): model2 = MinMaxScalerModel.load(d) self.assertEqual(str(model), str(model2)) + def test_robust_scaler(self): + df = ( + self.spark.createDataFrame( + [ + (1, 1.0, Vectors.dense([0.0])), + (2, 2.0, Vectors.dense([2.0])), + (3, 3.0, Vectors.sparse(1, [(0, 3.0)])), + ], + ["index", "weight", "features"], + ) + .coalesce(1) + .sortWithinPartitions("index") + .select("features") + ) + + scaler = RobustScaler(inputCol="features", outputCol="scaled") + self.assertEqual(scaler.getInputCol(), "features") + self.assertEqual(scaler.getOutputCol(), "scaled") + + # Estimator save & load + with tempfile.TemporaryDirectory(prefix="robust_scaler") as d: + scaler.write().overwrite().save(d) + scaler2 = RobustScaler.load(d) + self.assertEqual(str(scaler), str(scaler2)) + + model = scaler.fit(df) + self.assertTrue(np.allclose(model.range.toArray(), [3.0], atol=1e-4)) + self.assertTrue(np.allclose(model.median.toArray(), [2.0], atol=1e-4)) + + output = model.transform(df) + self.assertEqual(output.columns, ["features", "scaled"]) + self.assertEqual(output.count(), 3) + + # Model save & load + with tempfile.TemporaryDirectory(prefix="robust_scaler_model") as d: + model.write().overwrite().save(d) + model2 = RobustScalerModel.load(d) + self.assertEqual(str(model), str(model2)) + def test_binarizer(self): b0 = Binarizer() self.assertListEqual( diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala index 34a0317f55af5..40345089ae831 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala @@ -454,6 +454,8 @@ private[ml] object MLUtils { "maxAbs", // MaxAbsScalerModel "originalMax", // MinMaxScalerModel "originalMin", // MinMaxScalerModel + "range", // RobustScalerModel + "median", // RobustScalerModel "toString", "toDebugString", "numFeatures",