Skip to content

Commit

Permalink
Added Normalizer, StandardScaler to ml-features doc, plus small Java …
Browse files Browse the repository at this point in the history
…unit tests
  • Loading branch information
jkbradley committed May 21, 2015
1 parent a21c2d6 commit 0a862f9
Show file tree
Hide file tree
Showing 3 changed files with 292 additions and 0 deletions.
153 changes: 153 additions & 0 deletions docs/ml-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ for words_label in wordsDataFrame.select("words", "label").take(3):
</div>
</div>


## Binarizer

Binarization is the process of thresholding numerical features to binary features. As some probabilistic estimators make assumption that the input data is distributed according to [Bernoulli distribution](http://en.wikipedia.org/wiki/Bernoulli_distribution), a binarizer is useful for pre-processing the input data with continuous numerical features.
Expand Down Expand Up @@ -634,5 +635,157 @@ indexedData = indexerModel.transform(data)
</div>


## Normalizer

`Normalizer` is a `Transformer` which transforms a dataset of `Vector` rows, normalizing each `Vector` to have unit norm. It takes parameter `p`, which specifies the [p-norm](http://en.wikipedia.org/wiki/Norm_%28mathematics%29#p-norm) used for normalization. ($p = 2$ by default.) This normalization can help standardize your input data and improve the behavior of learning algorithms.

The following example demonstrates how to load a dataset in libsvm format and then normalize each row to have unit $L^2$ norm and unit $L^\infty$ norm.

<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val dataFrame = sqlContext.createDataFrame(data)
val normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures")

// Normalize each Vector using $L^2$ norm.
val l2NormData = normalizer.transform(dataFrame, normalizer.p -> 2)

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
{% endhighlight %}
</div>

<div data-lang="java">
{% highlight java %}
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.sql.DataFrame;

JavaRDD<LabeledPoint> data =
MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();
DataFrame dataFrame = jsql.createDataFrame(data, LabeledPoint.class);
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures");

// Normalize each Vector using $L^2$ norm.
DataFrame l2NormData = normalizer.transform(dataFrame, normalizer.p().w(2));

// Normalize each Vector using $L^\infty$ norm.
DataFrame lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
{% endhighlight %}
</div>

<div data-lang="python">
{% highlight python %}
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import Normalizer

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
dataFrame = sqlContext.createDataFrame(data)
normalizer = Normalizer(inputCol="features", outputCol="normFeatures")

# Normalize each Vector using $L^2$ norm.
l2NormData = normalizer.transform(dataFrame, {normalizer.p:2.0})

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p:float("inf")})
{% endhighlight %}
</div>
</div>


## StandardScaler

`StandardScaler` transforms a dataset of `Vector` rows, normalizing each feature to have unit standard deviation and/or zero mean. It takes parameters:

* `withStd`: True by default. Scales the data to unit standard deviation.
* `withMean`: False by default. Centers the data with mean before scaling. It will build a dense output, so this does not work on sparse input and will raise an exception.

`StandardScaler` is a `Model` which can be `fit` on a dataset to produce a `StandardScalerModel`; this amounts to computing summary statistics. The model can then transform a `Vector` column in a dataset to have unit standard deviation and/or zero mean features.

Note that if the standard deviation of a feature is zero, it will return default `0.0` value in the `Vector` for that feature.

More details can be found in the API docs for
[StandardScaler](api/scala/index.html#org.apache.spark.ml.feature.StandardScaler) and
[StandardScalerModel](api/scala/index.html#org.apache.spark.ml.feature.StandardScalerModel).

The following example demonstrates how to load a dataset in libsvm format and then normalize each feature to have unit standard deviation.

<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val dataFrame = sqlContext.createDataFrame(data)
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("normFeatures")
.setWithStd(true)
.setWithMean(false)

// Compute summary statistics by fitting the StandardScaler
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
{% endhighlight %}
</div>

<div data-lang="java">
{% highlight java %}
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.sql.DataFrame;

JavaRDD<LabeledPoint> data =
MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();
DataFrame dataFrame = jsql.createDataFrame(data, LabeledPoint.class);
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("normFeatures")
.setWithStd(true)
.setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
DataFrame scaledData = scalerModel.transform(dataFrame);
{% endhighlight %}
</div>

<div data-lang="python">
{% highlight python %}
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
dataFrame = sqlContext.createDataFrame(data)
scaler = StandardScaler(inputCol="features", outputCol="normFeatures",
withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
>>>>>>> Added Normalizer, StandardScaler to ml-features doc, plus small Java unit tests
{% endhighlight %}
</div>
</div>


# Feature Selectors

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature;

import java.util.List;

import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class JavaNormalizerSuite {
private transient JavaSparkContext jsc;
private transient SQLContext jsql;

@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaNormalizerSuite");
jsql = new SQLContext(jsc);
}

@After
public void tearDown() {
jsc.stop();
jsc = null;
}

@Test
public void regexTokenizer() {
// The tests are to check Java compatibility.
List<VectorIndexerSuite.FeatureData> points = Lists.newArrayList(
new VectorIndexerSuite.FeatureData(Vectors.dense(0.0, -2.0)),
new VectorIndexerSuite.FeatureData(Vectors.dense(1.0, 3.0)),
new VectorIndexerSuite.FeatureData(Vectors.dense(1.0, 4.0))
);
DataFrame dataFrame = jsql.createDataFrame(jsc.parallelize(points, 2),
VectorIndexerSuite.FeatureData.class);
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures");

// Normalize each Vector using $L^2$ norm.
DataFrame l2NormData = normalizer.transform(dataFrame, normalizer.p().w(2));

// Normalize each Vector using $L^\infty$ norm.
DataFrame lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature;

import java.util.List;

import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class JavaStandardScalerSuite {
private transient JavaSparkContext jsc;
private transient SQLContext jsql;

@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaStandardScalerSuite");
jsql = new SQLContext(jsc);
}

@After
public void tearDown() {
jsc.stop();
jsc = null;
}

@Test
public void regexTokenizer() {
// The tests are to check Java compatibility.
List<VectorIndexerSuite.FeatureData> points = Lists.newArrayList(
new VectorIndexerSuite.FeatureData(Vectors.dense(0.0, -2.0)),
new VectorIndexerSuite.FeatureData(Vectors.dense(1.0, 3.0)),
new VectorIndexerSuite.FeatureData(Vectors.dense(1.0, 4.0))
);
DataFrame dataFrame = jsql.createDataFrame(jsc.parallelize(points, 2),
VectorIndexerSuite.FeatureData.class);
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("normFeatures")
.setWithStd(true)
.setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
DataFrame scaledData = scalerModel.transform(dataFrame);
}
}

0 comments on commit 0a862f9

Please sign in to comment.