Skip to content

Commit

Permalink
[SPARK-22810][ML][PYSPARK] Expose Python API for LinearRegression wit…
Browse files Browse the repository at this point in the history
…h huber loss.

## What changes were proposed in this pull request?
Expose Python API for _LinearRegression_ with _huber_ loss.

## How was this patch tested?
Unit test.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #19994 from yanboliang/spark-22810.
  • Loading branch information
yanboliang committed Dec 21, 2017
1 parent 0114c89 commit fb0562f
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 15 deletions.
3 changes: 2 additions & 1 deletion python/pyspark/ml/param/_shared_params_code_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ def get$Name(self):
("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2",
"TypeConverters.toInt"),
("parallelism", "the number of threads to use when running parallel algorithms (>= 1).",
"1", "TypeConverters.toInt")]
"1", "TypeConverters.toInt"),
("loss", "the loss function to be optimized.", None, "TypeConverters.toString")]

code = []
for name, doc, defaultValueStr, typeConverter in shared:
Expand Down
23 changes: 23 additions & 0 deletions python/pyspark/ml/param/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,29 @@ def getParallelism(self):
return self.getOrDefault(self.parallelism)


class HasLoss(Params):
"""
Mixin for param loss: the loss function to be optimized.
"""

loss = Param(Params._dummy(), "loss", "the loss function to be optimized.", typeConverter=TypeConverters.toString)

def __init__(self):
super(HasLoss, self).__init__()

def setLoss(self, value):
"""
Sets the value of :py:attr:`loss`.
"""
return self._set(loss=value)

def getLoss(self):
"""
Gets the value of loss or its default value.
"""
return self.getOrDefault(self.loss)


class DecisionTreeParams(Params):
"""
Mixin for Decision Tree parameters.
Expand Down
64 changes: 50 additions & 14 deletions python/pyspark/ml/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,26 @@
@inherit_doc
class LinearRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter,
HasRegParam, HasTol, HasElasticNetParam, HasFitIntercept,
HasStandardization, HasSolver, HasWeightCol, HasAggregationDepth,
HasStandardization, HasSolver, HasWeightCol, HasAggregationDepth, HasLoss,
JavaMLWritable, JavaMLReadable):
"""
Linear regression.
The learning objective is to minimize the squared error, with regularization.
The specific squared error loss function used is: L = 1/2n ||A coefficients - y||^2^
The learning objective is to minimize the specified loss function, with regularization.
This supports two kinds of loss:
This supports multiple types of regularization:
* none (a.k.a. ordinary least squares)
* squaredError (a.k.a squared loss)
* huber (a hybrid of squared error for relatively small errors and absolute error for \
relatively large ones, and we estimate the scale parameter from training data)
* L2 (ridge regression)
This supports multiple types of regularization:
* L1 (Lasso)
* none (a.k.a. ordinary least squares)
* L2 (ridge regression)
* L1 (Lasso)
* L2 + L1 (elastic net)
* L2 + L1 (elastic net)
Note: Fitting with huber loss only supports none and L2 regularization.
>>> from pyspark.ml.linalg import Vectors
>>> df = spark.createDataFrame([
Expand Down Expand Up @@ -98,31 +101,42 @@ class LinearRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPrediction
solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
"options: auto, normal, l-bfgs.", typeConverter=TypeConverters.toString)

loss = Param(Params._dummy(), "loss", "The loss function to be optimized. Supported " +
"options: squaredError, huber.", typeConverter=TypeConverters.toString)

epsilon = Param(Params._dummy(), "epsilon", "The shape parameter to control the amount of " +
"robustness. Must be > 1.0. Only valid when loss is huber",
typeConverter=TypeConverters.toFloat)

@keyword_only
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
standardization=True, solver="auto", weightCol=None, aggregationDepth=2):
standardization=True, solver="auto", weightCol=None, aggregationDepth=2,
loss="squaredError", epsilon=1.35):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
standardization=True, solver="auto", weightCol=None, aggregationDepth=2)
standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \
loss="squaredError", epsilon=1.35)
"""
super(LinearRegression, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.regression.LinearRegression", self.uid)
self._setDefault(maxIter=100, regParam=0.0, tol=1e-6)
self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35)
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
@since("1.4.0")
def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
standardization=True, solver="auto", weightCol=None, aggregationDepth=2):
standardization=True, solver="auto", weightCol=None, aggregationDepth=2,
loss="squaredError", epsilon=1.35):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
standardization=True, solver="auto", weightCol=None, aggregationDepth=2)
standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \
loss="squaredError", epsilon=1.35)
Sets params for linear regression.
"""
kwargs = self._input_kwargs
Expand All @@ -131,6 +145,20 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre
def _create_model(self, java_model):
return LinearRegressionModel(java_model)

@since("2.3.0")
def setEpsilon(self, value):
"""
Sets the value of :py:attr:`epsilon`.
"""
return self._set(epsilon=value)

@since("2.3.0")
def getEpsilon(self):
"""
Gets the value of epsilon or its default value.
"""
return self.getOrDefault(self.epsilon)


class LinearRegressionModel(JavaModel, JavaPredictionModel, JavaMLWritable, JavaMLReadable):
"""
Expand All @@ -155,6 +183,14 @@ def intercept(self):
"""
return self._call_java("intercept")

@property
@since("2.3.0")
def scale(self):
"""
The value by which \|y - X'w\| is scaled down when loss is "huber", otherwise 1.0.
"""
return self._call_java("scale")

@property
@since("2.0.0")
def summary(self):
Expand Down
21 changes: 21 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,27 @@ def test_offset(self):
self.assertTrue(np.isclose(model.intercept, -1.561613, atol=1E-4))


class LinearRegressionTest(SparkSessionTestCase):

def test_linear_regression_with_huber_loss(self):

data_path = "data/mllib/sample_linear_regression_data.txt"
df = self.spark.read.format("libsvm").load(data_path)

lir = LinearRegression(loss="huber", epsilon=2.0)
model = lir.fit(df)

expectedCoefficients = [0.136, 0.7648, -0.7761, 2.4236, 0.537,
1.2612, -0.333, -0.5694, -0.6311, 0.6053]
expectedIntercept = 0.1607
expectedScale = 9.758

self.assertTrue(
np.allclose(model.coefficients.toArray(), expectedCoefficients, atol=1E-3))
self.assertTrue(np.isclose(model.intercept, expectedIntercept, atol=1E-3))
self.assertTrue(np.isclose(model.scale, expectedScale, atol=1E-3))


class LogisticRegressionTest(SparkSessionTestCase):

def test_binomial_logistic_regression_with_bound(self):
Expand Down

0 comments on commit fb0562f

Please sign in to comment.