Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4259][MLlib]: Add Power Iteration Clustering Algorithm with Gaussian Similarity Function #4254

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a3c5fbe
Adding Power Iteration Clustering
fjiang6 Jan 22, 2015
d5aae20
Adding Power Iteration Clustering and Suite test
fjiang6 Jan 22, 2015
3fd5bc8
PIClustering is running in new branch (up to the pseudo-eigenvector c…
sboeschhuawei Jan 23, 2015
0ef163f
Added ConcentricCircles data generation and KMeans clustering
sboeschhuawei Jan 23, 2015
32a90dc
Update circles test data values
sboeschhuawei Jan 23, 2015
0700335
First end to end working version: but has bad performance issue
sboeschhuawei Jan 23, 2015
e5df2b8
First end to end working PIC
sboeschhuawei Jan 24, 2015
9294263
Added visualization/plotting of input/output data
sboeschhuawei Jan 25, 2015
a2b1e57
Revert inadvertent update to KMeans
sboeschhuawei Jan 25, 2015
b7dbcbe
Added axes and combined into single plot for matplotlib
sboeschhuawei Jan 26, 2015
f656c34
Added iris dataset
sboeschhuawei Jan 26, 2015
a112f38
Added graphx main and test jars as dependencies to mllib/pom.xml
sboeschhuawei Jan 26, 2015
ace9749
Update PIClustering.scala
fjiang6 Jan 26, 2015
b29c0db
Update PIClustering.scala
fjiang6 Jan 26, 2015
bea48ea
Converted custom Linear Algebra datatypes/routines to use Breeze.
sboeschhuawei Jan 27, 2015
90e7fa4
Converted from custom Linalg routines to Breeze: added JavaDoc commen…
sboeschhuawei Jan 28, 2015
be659e3
Added mllib specific log4j
sboeschhuawei Jan 28, 2015
060e6bf
Added link to PIC doc from the main clustering md doc
sboeschhuawei Jan 28, 2015
24f438e
fixed incorrect markdown in clustering doc
sboeschhuawei Jan 28, 2015
88aacc8
Add assert to testcase on cluster sizes
sboeschhuawei Jan 28, 2015
43ab10b
Change last two println's to log4j logger
sboeschhuawei Jan 28, 2015
218a49d
Applied Xiangrui's comments - especially removing RDD/PICLinalg class…
sboeschhuawei Jan 28, 2015
1c3a62e
removed matplot.py and reordered all private methods to bottom of PIC
sboeschhuawei Jan 28, 2015
121e4d5
Remove unused testing data files
sboeschhuawei Jan 28, 2015
7ebd149
Incorporate Xiangrui's first set of PR comments except restructure PI…
sboeschhuawei Jan 28, 2015
92d4752
Move the Guassian/ Affinity matrix calcs out of PIC. Presently in the…
sboeschhuawei Jan 29, 2015
c12dfc8
Removed examples files and added pic_data.txt. Revamped testcases yet…
sboeschhuawei Jan 29, 2015
24fbf52
Updated API to be similar to KMeans plus other changes requested by X…
sboeschhuawei Jan 30, 2015
4b78aaf
refactor PIC
mengxr Jan 30, 2015
f292f31
Merge pull request #44 from mengxr/SPARK-4259
sboeschhuawei Jan 30, 2015
4550850
Removed pic test data
sboeschhuawei Jan 30, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions docs/mllib-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ a given dataset, the algorithm returns the best clustering result).
* *initializationSteps* determines the number of steps in the k-means\|\| algorithm.
* *epsilon* determines the distance threshold within which we consider k-means to have converged.

### Power Iteration Clustering

Power iteration clustering is a scalable and efficient algorithm for clustering points given pointwise mutual affinity values. Internally the algorithm:

* accepts a [Graph](https://spark.apache.org/docs/0.9.2/api/graphx/index.html#org.apache.spark.graphx.Graph) that represents a normalized pairwise affinity between all input points.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use relative path "api/graphx/...". See examples in this markdown file.

* calculates the principal eigenvalue and eigenvector
* Clusters each of the input points according to their principal eigenvector component value

Details of this algorithm are found within [Power Iteration Clustering, Lin and Cohen]{www.icml2010.org/papers/387.pdf}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the correct syntax for links in markdown. Use [](...)


Example outputs for a dataset inspired by the paper - but with five clusters instead of three- have he following output from our implementation:

<p style="text-align: center;">
<img src="img/PIClusteringFiveCirclesInputsAndOutputs.png"
title="The Property Graph"
alt="The Property Graph"
width="50%" />
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>

### Examples

<div class="codetabs">
Expand Down
5 changes: 5 additions & 0 deletions mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.clustering

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom

/**
* Model produced by [[PowerIterationClustering]].
*
* @param k number of clusters
* @param assignments an RDD of (vertexID, clusterID) pairs
*/
class PowerIterationClusteringModel(
val k: Int,
val assignments: RDD[(Long, Int)]) extends Serializable

/**
* Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by Lin and
* Cohen (see http://www.icml2010.org/papers/387.pdf). From the abstract: PIC finds a very
* low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise
* similarity matrix of the data.
*
* @param k Number of clusters.
* @param maxIterations Maximum number of iterations of the PIC algorithm.
*/
class PowerIterationClustering private[clustering] (
private var k: Int,
private var maxIterations: Int) extends Serializable {

import org.apache.spark.mllib.clustering.PowerIterationClustering._

/** Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100}. */
def this() = this(k = 2, maxIterations = 100)

/**
* Set the number of clusters.
*/
def setK(k: Int): this.type = {
this.k = k
this
}

/**
* Set maximum number of iterations of the power iteration loop
*/
def setMaxIterations(maxIterations: Int): this.type = {
this.maxIterations = maxIterations
this
}

/**
* Run the PIC algorithm.
*
* @param similarities an RDD of (i, j, s_ij_) tuples representing the affinity matrix, which is
* the matrix A in the PIC paper. The similarity s_ij_ must be nonnegative.
* This is a symmetric matrix and hence s_ij_ = s_ji_. For any (i, j) with
* nonzero similarity, there should be either (i, j, s_ij_) or (j, i, s_ji_)
* in the input. Tuples with i = j are ignored, because we assume s_ij_ = 0.0.
*
* @return a [[PowerIterationClusteringModel]] that contains the clustering result
*/
def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel = {
val w = normalize(similarities)
val w0 = randomInit(w)
pic(w0)
}

/**
* Runs the PIC algorithm.
*
* @param w The normalized affinity matrix, which is the matrix W in the PIC paper with
* w_ij_ = a_ij_ / d_ii_ as its edge properties and the initial vector of the power
* iteration as its vertex properties.
*/
private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = {
val v = powerIter(w, maxIterations)
val assignments = kMeans(v, k)
new PowerIterationClusteringModel(k, assignments)
}
}

private[clustering] object PowerIterationClustering extends Logging {
/**
* Normalizes the affinity matrix (A) by row sums and returns the normalized affinity matrix (W).
*/
def normalize(similarities: RDD[(Long, Long, Double)]): Graph[Double, Double] = {
val edges = similarities.flatMap { case (i, j, s) =>
if (s < 0.0) {
throw new SparkException("Similarity must be nonnegative but found s($i, $j) = $s.")
}
if (i != j) {
Seq(Edge(i, j, s), Edge(j, i, s))
} else {
None
}
}
val gA = Graph.fromEdges(edges, 0.0)
val vD = gA.aggregateMessages[Double](
sendMsg = ctx => {
ctx.sendToSrc(ctx.attr)
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
GraphImpl.fromExistingRDDs(vD, gA.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
TripletFields.Src)
}

/**
* Generates random vertex properties (v0) to start power iteration.
*
* @param g a graph representing the normalized affinity matrix (W)
* @return a graph with edges representing W and vertices representing a random vector
* with unit 1-norm
*/
def randomInit(g: Graph[Double, Double]): Graph[Double, Double] = {
val r = g.vertices.mapPartitionsWithIndex(
(part, iter) => {
val random = new XORShiftRandom(part)
iter.map { case (id, _) =>
(id, random.nextGaussian())
}
}, preservesPartitioning = true).cache()
val sum = r.values.map(math.abs).sum()
val v0 = r.mapValues(x => x / sum)
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
}

/**
* Runs power iteration.
* @param g input graph with edges representing the normalized affinity matrix (W) and vertices
* representing the initial vector of the power iterations.
* @param maxIterations maximum number of iterations
* @return a [[VertexRDD]] representing the pseudo-eigenvector
*/
def powerIter(
g: Graph[Double, Double],
maxIterations: Int): VertexRDD[Double] = {
// the default tolerance used in the PIC paper, with a lower bound 1e-8
val tol = math.max(1e-5 / g.vertices.count(), 1e-8)
var prevDelta = Double.MaxValue
var diffDelta = Double.MaxValue
var curG = g
for (iter <- 0 until maxIterations if math.abs(diffDelta) > tol) {
val msgPrefix = s"Iteration $iter"
// multiply W by vt
val v = curG.aggregateMessages[Double](
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
mergeMsg = _ + _,
TripletFields.Dst).cache()
// normalize v
val norm = v.values.map(math.abs).sum()
logInfo(s"$msgPrefix: norm(v) = $norm.")
val v1 = v.mapValues(x => x / norm)
// compare difference
val delta = curG.joinVertices(v1) { case (_, x, y) =>
math.abs(x - y)
}.vertices.values.sum()
logInfo(s"$msgPrefix: delta = $delta.")
diffDelta = math.abs(delta - prevDelta)
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
// update v
curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
prevDelta = delta
}
curG.vertices
}

/**
* Runs k-means clustering.
* @param v a [[VertexRDD]] representing the pseudo-eigenvector
* @param k number of clusters
* @return a [[VertexRDD]] representing the clustering assignments
*/
def kMeans(v: VertexRDD[Double], k: Int): VertexRDD[Int] = {
val points = v.mapValues(x => Vectors.dense(x)).cache()
val model = new KMeans()
.setK(k)
.setRuns(5)
.setSeed(0L)
.run(points.values)
points.mapValues(p => model.predict(p)).cache()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.clustering

import scala.collection.mutable

import org.scalatest.FunSuite

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._

class PowerIterationClusteringSuite extends FunSuite with MLlibTestSparkContext {

import org.apache.spark.mllib.clustering.PowerIterationClustering._

test("power iteration clustering") {
/*
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
edge (3, 4).

15-14 -13 -12
| |
4 . 3 - 2 11
| | x | |
5 0 - 1 10
| |
6 - 7 - 8 - 9
*/

val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
val model = new PowerIterationClustering()
.setK(2)
.run(sc.parallelize(similarities, 2))
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { case (i, c) =>
predictions(c) += i
}
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
}

test("normalize and powerIter") {
/*
Test normalize() with the following graph:

0 - 3
| \ |
1 - 2

The affinity matrix (A) is

0 1 1 1
1 0 1 0
1 1 0 1
1 0 1 0

D is diag(3, 2, 3, 2) and hence W is

0 1/3 1/3 1/3
1/2 0 1/2 0
1/3 1/3 0 1/3
1/2 0 1/2 0
*/
val similarities = Seq[(Long, Long, Double)](
(0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0), (2, 3, 1.0))
val expected = Array(
Array(0.0, 1.0/3.0, 1.0/3.0, 1.0/3.0),
Array(1.0/2.0, 0.0, 1.0/2.0, 0.0),
Array(1.0/3.0, 1.0/3.0, 0.0, 1.0/3.0),
Array(1.0/2.0, 0.0, 1.0/2.0, 0.0))
val w = normalize(sc.parallelize(similarities, 2))
w.edges.collect().foreach { case Edge(i, j, x) =>
assert(x ~== expected(i.toInt)(j.toInt) absTol 1e-14)
}
val v0 = sc.parallelize(Seq[(Long, Double)]((0, 0.1), (1, 0.2), (2, 0.3), (3, 0.4)), 2)
val w0 = Graph(v0, w.edges)
val v1 = powerIter(w0, maxIterations = 1).collect()
val u = Array(0.3, 0.2, 0.7/3.0, 0.2)
val norm = u.sum
val u1 = u.map(x => x / norm)
v1.foreach { case (i, x) =>
assert(x ~== u1(i.toInt) absTol 1e-14)
}
}
}