Skip to content

Commit

Permalink
[SPARK-6227][MLLIB][PYSPARK] Implement PySpark wrappers for SVD and P…
Browse files Browse the repository at this point in the history
…CA (v2)

Add PCA and SVD to PySpark's wrappers for `RowMatrix` and `IndexedRowMatrix` (SVD only).

Based on apache#7963, updated.

## How was this patch tested?

New doc tests and unit tests. Ran all examples locally.

Author: MechCoder <manojkumarsivaraj334@gmail.com>
Author: Nick Pentreath <nickp@za.ibm.com>

Closes apache#17621 from MLnick/SPARK-6227-pyspark-svd-pca.
  • Loading branch information
MechCoder authored and Nick Pentreath committed May 3, 2017
1 parent 6235132 commit db2fb84
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 46 deletions.
29 changes: 17 additions & 12 deletions docs/mllib-dimensionality-reduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ Refer to the [`SingularValueDecomposition` Java docs](api/java/org/apache/spark/

The same code applies to `IndexedRowMatrix` if `U` is defined as an
`IndexedRowMatrix`.
</div>
<div data-lang="python" markdown="1">
Refer to the [`SingularValueDecomposition` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.SingularValueDecomposition) for details on the API.

In order to run the above application, follow the instructions
provided in the [Self-Contained
Applications](quick-start.html#self-contained-applications) section of the Spark
quick-start guide. Be sure to also include *spark-mllib* to your build file as
a dependency.
{% include_example python/mllib/svd_example.py %}

The same code applies to `IndexedRowMatrix` if `U` is defined as an
`IndexedRowMatrix`.
</div>
</div>

Expand Down Expand Up @@ -118,17 +119,21 @@ Refer to the [`PCA` Scala docs](api/scala/index.html#org.apache.spark.mllib.feat

The following code demonstrates how to compute principal components on a `RowMatrix`
and use them to project the vectors into a low-dimensional space.
The number of columns should be small, e.g, less than 1000.

Refer to the [`RowMatrix` Java docs](api/java/org/apache/spark/mllib/linalg/distributed/RowMatrix.html) for details on the API.

{% include_example java/org/apache/spark/examples/mllib/JavaPCAExample.java %}

</div>
</div>

In order to run the above application, follow the instructions
provided in the [Self-Contained Applications](quick-start.html#self-contained-applications)
section of the Spark
quick-start guide. Be sure to also include *spark-mllib* to your build file as
a dependency.
<div data-lang="python" markdown="1">

The following code demonstrates how to compute principal components on a `RowMatrix`
and use them to project the vectors into a low-dimensional space.

Refer to the [`RowMatrix` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix) for details on the API.

{% include_example python/mllib/pca_rowmatrix_example.py %}

</div>
</div>
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.examples.mllib;

// $example on$
import java.util.LinkedList;
import java.util.Arrays;
import java.util.List;
// $example off$

import org.apache.spark.SparkConf;
Expand All @@ -39,28 +40,32 @@ public class JavaPCAExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);

// $example on$
double[][] array = {{1.12, 2.05, 3.12}, {5.56, 6.28, 8.94}, {10.2, 8.0, 20.5}};
LinkedList<Vector> rowsList = new LinkedList<>();
for (int i = 0; i < array.length; i++) {
Vector currentRow = Vectors.dense(array[i]);
rowsList.add(currentRow);
}
JavaRDD<Vector> rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList);
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);

JavaRDD<Vector> rows = jsc.parallelize(data);

// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Compute the top 3 principal components.
Matrix pc = mat.computePrincipalComponents(3);
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Matrix pc = mat.computePrincipalComponents(4);

// Project the rows to the linear space spanned by the top 4 principal components.
RowMatrix projected = mat.multiply(pc);
// $example off$
Vector[] collectPartitions = (Vector[])projected.rows().collect();
System.out.println("Projected vector of principal component:");
for (Vector vector : collectPartitions) {
System.out.println("\t" + vector);
}
sc.stop();
jsc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.examples.mllib;

// $example on$
import java.util.LinkedList;
import java.util.Arrays;
import java.util.List;
// $example off$

import org.apache.spark.SparkConf;
Expand All @@ -43,22 +44,22 @@ public static void main(String[] args) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);

// $example on$
double[][] array = {{1.12, 2.05, 3.12}, {5.56, 6.28, 8.94}, {10.2, 8.0, 20.5}};
LinkedList<Vector> rowsList = new LinkedList<>();
for (int i = 0; i < array.length; i++) {
Vector currentRow = Vectors.dense(array[i]);
rowsList.add(currentRow);
}
JavaRDD<Vector> rows = jsc.parallelize(rowsList);
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);

JavaRDD<Vector> rows = jsc.parallelize(data);

// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Compute the top 3 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(3, true, 1.0E-9d);
RowMatrix U = svd.U();
Vector s = svd.s();
Matrix V = svd.V();
// Compute the top 5 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U(); // The U factor is a RowMatrix.
Vector s = svd.s(); // The singular values are stored in a local dense vector.
Matrix V = svd.V(); // The V factor is a local dense matrix.
// $example off$
Vector[] collectPartitions = (Vector[]) U.rows().collect();
System.out.println("U factor is:");
Expand Down
46 changes: 46 additions & 0 deletions examples/src/main/python/mllib/pca_rowmatrix_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark import SparkContext
# $example on$
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
# $example off$

if __name__ == "__main__":
sc = SparkContext(appName="PythonPCAOnRowMatrixExample")

# $example on$
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])

mat = RowMatrix(rows)
# Compute the top 4 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(4)

# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)
# $example off$
collected = projected.rows.collect()
print("Projected Row Matrix of principal component:")
for vector in collected:
print(vector)
sc.stop()
48 changes: 48 additions & 0 deletions examples/src/main/python/mllib/svd_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark import SparkContext
# $example on$
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
# $example off$

if __name__ == "__main__":
sc = SparkContext(appName="PythonSVDExample")

# $example on$
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])

mat = RowMatrix(rows)

# Compute the top 5 singular values and corresponding singular vectors.
svd = mat.computeSVD(5, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
# $example off$
collected = U.rows.collect()
print("U factor is:")
for vector in collected:
print(vector)
print("Singular values are: %s" % s)
print("V factor is:\n%s" % V)
sc.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ object PCAOnRowMatrixExample {
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))

val dataRDD = sc.parallelize(data, 2)
val rows = sc.parallelize(data)

val mat: RowMatrix = new RowMatrix(dataRDD)
val mat: RowMatrix = new RowMatrix(rows)

// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
// $example off$

/**
* Example for SingularValueDecomposition.
*/
object SVDExample {

def main(args: Array[String]): Unit = {
Expand All @@ -41,15 +44,15 @@ object SVDExample {
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))

val dataRDD = sc.parallelize(data, 2)
val rows = sc.parallelize(data)

val mat: RowMatrix = new RowMatrix(dataRDD)
val mat: RowMatrix = new RowMatrix(rows)

// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, computeU = true)
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
// $example off$
val collect = U.rows.collect()
println("U factor is:")
Expand Down
Loading

0 comments on commit db2fb84

Please sign in to comment.