Skip to content

Commit

Permalink
Incorporate Xiangrui's first set of PR comments except restructure PI…
Browse files Browse the repository at this point in the history
…C.run to take Graph but do not remove Gaussian
  • Loading branch information
sboeschhuawei committed Jan 28, 2015
1 parent 121e4d5 commit 7ebd149
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.language.existentials
* representation. The resulting pseudo-eigenvector provides effective clustering - as
* performed by Parallel KMeans.
*/
object PIClustering {
object PowerIterationClustering {

private val logger = Logger.getLogger(getClass.getName())

Expand All @@ -44,32 +44,32 @@ object PIClustering {
type DGraph = Graph[Double, Double]
type IndexedVector[Double] = (Long, BDV[Double])


// Terminate iteration when norm changes by less than this value
private[mllib] val DefaultMinNormChange: Double = 1e-11
private[mllib] val defaultMinNormChange: Double = 1e-11

// Default σ for Gaussian Distance calculations
private[mllib] val DefaultSigma = 1.0
// Default sigma for Gaussian Distance calculations
private[mllib] val defaultSigma = 1.0

// Default number of iterations for PIC loop
private[mllib] val DefaultIterations: Int = 20
private[mllib] val defaultIterations: Int = 20

// Default minimum affinity between points - lower than this it is considered
// zero and no edge will be created
private[mllib] val DefaultMinAffinity = 1e-11
private[mllib] val defaultMinAffinity = 1e-11

// Do not allow divide by zero: change to this value instead
val DefaultDivideByZeroVal: Double = 1e-15
val defaultDivideByZeroVal: Double = 1e-15

// Default number of runs by the KMeans.run() method
val DefaultKMeansRuns = 10
val defaultKMeansRuns = 10

/**
*
* Run a Power Iteration Clustering
*
* @param sc Spark Context
* @param points Input Points in format of [(VertexId,(x,y)]
* where VertexId is a Long
* @param G Affinity Matrix in a Sparse Graph structure
* @param nClusters Number of clusters to create
* @param nIterations Number of iterations of the PIC algorithm
* that calculates primary PseudoEigenvector and Eigenvalue
Expand All @@ -83,30 +83,13 @@ object PIClustering {
* Seq[(VertexId, ClusterID Membership)]
*/
def run(sc: SparkContext,
points: Points,
G: Graph[Double, Double],
nClusters: Int,
nIterations: Int = DefaultIterations,
sigma: Double = DefaultSigma,
minAffinity: Double = DefaultMinAffinity,
nRuns: Int = DefaultKMeansRuns)
nIterations: Int = defaultIterations,
sigma: Double = defaultSigma,
minAffinity: Double = defaultMinAffinity,
nRuns: Int = defaultKMeansRuns)
: (Seq[(Int, Vector)], Seq[((VertexId, Vector), Int)]) = {
val vidsRdd = sc.parallelize(points.map(_._1).sorted)
val nVertices = points.length

val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
if (logger.isDebugEnabled) {
logger.debug(s"Vt(0)=${
printVector(new BDV(initialVt.map {
_._2
}.toArray))
}")
}
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
if (logger.isDebugEnabled) {
logger.debug(printMatrixFromEdges(G.edges))
}
val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations)
// TODO: avoid local collect and then sc.parallelize.
val localVt = vt.collect.sortBy(_._1)
Expand Down Expand Up @@ -140,36 +123,43 @@ object PIClustering {
}

/**
* Read Points from an input file in the following format:
* Vertex1Id Coord11 Coord12 CoordX13 .. Coord1D
* Vertex2Id Coord21 Coord22 CoordX23 .. Coord2D
* ..
* VertexNId CoordN1 CoordN2 CoordN23 .. CoordND
*
* Where N is the number of observations, each a D-dimension point
*
* E.g.
* Create an affinity matrix
*
* 19 1.8035177495 0.7460582552 0.2361611395 -0.8645567427 -0.8613062
* 10 0.5534111111 1.0456386879 1.7045663273 0.7281759816 1.0807487792
* 911 1.200749626 1.8962364439 2.5117192131 -0.4034737281 -0.9069696484
*
* Which represents three 5-dimensional input Points with VertexIds 19,10, and 911
* @param verticesFile Local filesystem path to the Points input file
* @return Set of Vertices in format appropriate for consumption by the PIC algorithm
* @param sc Spark Context
* @param points Input Points in format of [(VertexId,(x,y)]
* where VertexId is a Long
* @param sigma Sigma for Gaussian distribution calculation according to
* [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
* @param minAffinity Minimum Affinity between two Points in the input dataset: below
* this threshold the affinity will be considered "close to" zero and
* no Edge will be created between those Points in the sparse matrix
* @return Tuple of (Seq[(Cluster Id,Cluster Center)],
* Seq[(VertexId, ClusterID Membership)]
*/
def readVerticesfromFile(verticesFile: String): Points = {

import scala.io.Source
val vertices = Source.fromFile(verticesFile).getLines.map { l =>
val toks = l.split("\t")
val arr = new BDV(toks.slice(1, toks.length).map(_.toDouble))
(toks(0).toLong, arr)
}.toSeq
def createGaussianAffinityMatrix(sc: SparkContext,
points: Points,
sigma: Double = defaultSigma,
minAffinity: Double = defaultMinAffinity)
: Graph[Double, Double] = {
val vidsRdd = sc.parallelize(points.map(_._1).sorted)
val nVertices = points.length

val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
if (logger.isDebugEnabled) {
logger.debug(s"Vt(0)=${
printVector(new BDV(initialVt.map {
_._2
}.toArray))
}")
}
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
if (logger.isDebugEnabled) {
logger.debug(s"Read in ${vertices.length} from $verticesFile")
logger.debug(printMatrixFromEdges(G.edges))
}
vertices
G
}

/**
Expand Down Expand Up @@ -205,7 +195,7 @@ object PIClustering {
*/
def getPrincipalEigen(sc: SparkContext,
G: DGraph,
nIterations: Int = DefaultIterations,
nIterations: Int = defaultIterations,
optMinNormChange: Option[Double] = None
): (DGraph, Double, VertexRDD[Double]) = {

Expand Down Expand Up @@ -312,7 +302,7 @@ object PIClustering {
* @return
*/
private[mllib] def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector[Double]],
minAffinity: Double = DefaultMinAffinity) = {
minAffinity: Double = defaultMinAffinity) = {
val labels = wRdd.map { case (vid, vect) => vid}.collect
val edgesRdd = wRdd.flatMap { case (vid, vect) =>
for ((dval, ix) <- vect.toArray.zipWithIndex
Expand Down Expand Up @@ -387,7 +377,7 @@ object PIClustering {

}

private[mllib] def makeNonZero(dval: Double, tol: Double = DefaultDivideByZeroVal) = {
private[mllib] def makeNonZero(dval: Double, tol: Double = defaultDivideByZeroVal) = {
if (Math.abs(dval) < tol) {
Math.signum(dval) * tol
} else {
Expand Down
41 changes: 0 additions & 41 deletions mllib/src/test/resources/log4j.mllib.properties

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@ package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseVector => BDV}
import org.apache.log4j.Logger
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.scalatest.FunSuite

import scala.util.Random

class PIClusteringSuite extends FunSuite with LocalSparkContext {
class PowerIterationClusteringSuite extends FunSuite with MLlibTestSparkContext {

val logger = Logger.getLogger(getClass.getName)

import org.apache.spark.mllib.clustering.PIClusteringSuite._
import org.apache.spark.mllib.clustering.PowerIterationClusteringSuite._

val PIC = PIClustering
val PIC = PowerIterationClustering
val A = Array

test("concentricCirclesTest") {
concentricCirclesTest()
}


def concentricCirclesTest() = {
val sigma = 1.0
val nIterations = 10
Expand All @@ -55,23 +53,22 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {

val nClusters = circleSpecs.size
val cdata = createConcentricCirclesData(circleSpecs)
withSpark { sc =>
val vertices = new Random().shuffle(cdata.map { p =>
(p.label, new BDV(Array(p.x, p.y)))
})
val vertices = new Random().shuffle(cdata.map { p =>
(p.label, new BDV(Array(p.x, p.y)))
})

val nVertices = vertices.length
val (ccenters, estCollected) = PIC.run(sc, vertices, nClusters, nIterations)
logger.info(s"Cluster centers: ${ccenters.mkString(",")} " +
s"\nEstimates: ${estCollected.mkString("[", ",", "]")}")
assert(ccenters.size == circleSpecs.length, "Did not get correct number of centers")
val nVertices = vertices.length
val G = PIC.createGaussianAffinityMatrix(sc, vertices)
val (ccenters, estCollected) = PIC.run(sc, G, nClusters, nIterations)
logger.info(s"Cluster centers: ${ccenters.mkString(",")} " +
s"\nEstimates: ${estCollected.mkString("[", ",", "]")}")
assert(ccenters.size == circleSpecs.length, "Did not get correct number of centers")

}
}

}

object PIClusteringSuite {
object PowerIterationClusteringSuite {
val logger = Logger.getLogger(getClass.getName)
val A = Array

Expand Down Expand Up @@ -115,26 +112,7 @@ object PIClusteringSuite {
}

def main(args: Array[String]) {
val pictest = new PIClusteringSuite
val pictest = new PowerIterationClusteringSuite
pictest.concentricCirclesTest()
}
}

/**
* Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped
* after each test.
* TODO: import this from the graphx test cases package i.e. may need update to pom.xml
*/
trait LocalSparkContext {
/** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
def withSpark[T](f: SparkContext => T) = {
val conf = new SparkConf()
GraphXUtils.registerKryoClasses(conf)
val sc = new SparkContext("local", "test", conf)
try {
f(sc)
} finally {
sc.stop()
}
}
}

0 comments on commit 7ebd149

Please sign in to comment.