Skip to content

Commit

Permalink
Move the Guassian/ Affinity matrix calcs out of PIC. Presently in the…
Browse files Browse the repository at this point in the history
… test suite
  • Loading branch information
sboeschhuawei committed Jan 29, 2015
1 parent 7ebd149 commit 92d4752
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 213 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.apache.spark.examples.mllib

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.clustering.PowerIterationClustering
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkConf, SparkContext}
import scopt.OptionParser
import breeze.linalg.{DenseVector => BDV}

import scala.util.Random

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


/**
* An example k-means app. Run with
* {{{
* ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] <input>
* }}}
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object PowerIterationClusteringExample {


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.apache.spark.examples.mllib

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.clustering.PowerIterationClustering
import scopt.OptionParser

import scala.util.Random

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


/**
* An example k-means app. Run with
* {{{
* ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] <input>
* }}}
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object PowerIterationClusteringForGaussian {


val PIC = PowerIterationClustering

case class Params(
input: String = null,
k: Int = -1,
numIterations: Int = PIC.defaultIterations
) extends AbstractParams[Params]

def main(args: Array[String]) {
val defaultParams = Params()

val parser = new OptionParser[Params]("DenseKMeans") {
head("Power Iteration Clustering for Gaussian Similarity inputs.")
opt[Int]('k', "k")
.required()
.text(s"number of clusters, required")
.action((x, c) => c.copy(k = x))
opt[Int]("numIterations")
.text(s"number of iterations, default; ${defaultParams.numIterations}")
.action((x, c) => c.copy(numIterations = x))
arg[String]("<input>")
.text("input paths to examples")
.required()
.action((x, c) => c.copy(input = x))
}

parser.parse(args, defaultParams).map { params =>
run(params)
}.getOrElse {
sys.exit(1)
}
}

def run(params: Params) {
val conf = new SparkConf().setAppName(s"DenseKMeans with $params")
val sc = new SparkContext(conf)

Logger.getRootLogger.setLevel(Level.WARN)

val examples = sc.textFile(params.input).map { line =>
Vectors.dense(line.split(' ').map(_.toDouble))
}.cache()

val numExamples = examples.count()


sc.stop()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,11 @@ object PowerIterationClustering {
type DGraph = Graph[Double, Double]
type IndexedVector[Double] = (Long, BDV[Double])


// Terminate iteration when norm changes by less than this value
private[mllib] val defaultMinNormChange: Double = 1e-11

// Default sigma for Gaussian Distance calculations
private[mllib] val defaultSigma = 1.0
val defaultMinNormChange: Double = 1e-11

// Default number of iterations for PIC loop
private[mllib] val defaultIterations: Int = 20

// Default minimum affinity between points - lower than this it is considered
// zero and no edge will be created
private[mllib] val defaultMinAffinity = 1e-11
val defaultIterations: Int = 20

// Do not allow divide by zero: change to this value instead
val defaultDivideByZeroVal: Double = 1e-15
Expand All @@ -73,11 +65,6 @@ object PowerIterationClustering {
* @param nClusters Number of clusters to create
* @param nIterations Number of iterations of the PIC algorithm
* that calculates primary PseudoEigenvector and Eigenvalue
* @param sigma Sigma for Gaussian distribution calculation according to
* [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
* @param minAffinity Minimum Affinity between two Points in the input dataset: below
* this threshold the affinity will be considered "close to" zero and
* no Edge will be created between those Points in the sparse matrix
* @param nRuns Number of runs for the KMeans clustering
* @return Tuple of (Seq[(Cluster Id,Cluster Center)],
* Seq[(VertexId, ClusterID Membership)]
Expand All @@ -86,8 +73,6 @@ object PowerIterationClustering {
G: Graph[Double, Double],
nClusters: Int,
nIterations: Int = defaultIterations,
sigma: Double = defaultSigma,
minAffinity: Double = defaultMinAffinity,
nRuns: Int = defaultKMeansRuns)
: (Seq[(Int, Vector)], Seq[((VertexId, Vector), Int)]) = {
val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations)
Expand Down Expand Up @@ -122,45 +107,6 @@ object PowerIterationClustering {
(ccs, estCollected)
}

/**
*
* Create an affinity matrix
*
* @param sc Spark Context
* @param points Input Points in format of [(VertexId,(x,y)]
* where VertexId is a Long
* @param sigma Sigma for Gaussian distribution calculation according to
* [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
* @param minAffinity Minimum Affinity between two Points in the input dataset: below
* this threshold the affinity will be considered "close to" zero and
* no Edge will be created between those Points in the sparse matrix
* @return Tuple of (Seq[(Cluster Id,Cluster Center)],
* Seq[(VertexId, ClusterID Membership)]
*/
def createGaussianAffinityMatrix(sc: SparkContext,
points: Points,
sigma: Double = defaultSigma,
minAffinity: Double = defaultMinAffinity)
: Graph[Double, Double] = {
val vidsRdd = sc.parallelize(points.map(_._1).sorted)
val nVertices = points.length

val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
if (logger.isDebugEnabled) {
logger.debug(s"Vt(0)=${
printVector(new BDV(initialVt.map {
_._2
}.toArray))
}")
}
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
if (logger.isDebugEnabled) {
logger.debug(printMatrixFromEdges(G.edges))
}
G
}

/**
* Create a Graph given an initial Vt0 and a set of Edges that
Expand Down Expand Up @@ -270,154 +216,5 @@ object PowerIterationClustering {
initialVt
}

/**
* Calculate the Gaussian distance between two Vectors according to:
*
* exp( -(X1-X2)^2/2*sigma^2))
*
* where X1 and X2 are Vectors
*
* @param vect1 Input Vector1
* @param vect2 Input Vector2
* @param sigma Gaussian parameter sigma
* @return
*/
private[mllib] def gaussianDist(vect1: BDV[Double], vect2: BDV[Double], sigma: Double) = {
val c1c2 = vect1.toArray.zip(vect2.toArray)
val dist = Math.exp((-0.5 / Math.pow(sigma, 2.0)) * c1c2.foldLeft(0.0) {
case (dist: Double, (c1: Double, c2: Double)) =>
dist + Math.pow(c1 - c2, 2)
})
dist
}

/**
* Create a sparse EdgeRDD from an array of densevectors. The elements that
* are "close to" zero - as configured by the minAffinity value - do not
* result in an Edge being created.
*
* @param sc
* @param wRdd
* @param minAffinity
* @return
*/
private[mllib] def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector[Double]],
minAffinity: Double = defaultMinAffinity) = {
val labels = wRdd.map { case (vid, vect) => vid}.collect
val edgesRdd = wRdd.flatMap { case (vid, vect) =>
for ((dval, ix) <- vect.toArray.zipWithIndex
if Math.abs(dval) >= minAffinity)
yield Edge(vid, labels(ix), dval)
}
edgesRdd
}

/**
* Create the normalized affinity matrix "W" given a set of Points
*
* @param sc SparkContext
* @param points Input Points in format of [(VertexId,(x,y)]
* where VertexId is a Long
* @param sigma Gaussian parameter sigma
* @return
*/
private[mllib] def createNormalizedAffinityMatrix(sc: SparkContext,
points: Points, sigma: Double) = {
val nVertices = points.length
val affinityRddNotNorm = sc.parallelize({
val ivect = new Array[IndexedVector[Double]](nVertices)
for (i <- 0 until points.size) {
ivect(i) = new IndexedVector(points(i)._1, new BDV(Array.fill(nVertices)(100.0)))
for (j <- 0 until points.size) {
val dist = if (i != j) {
gaussianDist(points(i)._2, points(j)._2, sigma)
} else {
0.0
}
ivect(i)._2(j) = dist
}
}
ivect.zipWithIndex.map { case (vect, ix) =>
(ix, vect)
}
}, nVertices)
if (logger.isDebugEnabled) {
logger.debug(s"Affinity:\n${
printMatrix(affinityRddNotNorm.map(_._2), nVertices, nVertices)
}")
}
val rowSums = affinityRddNotNorm.map { case (ix, (vid, vect)) =>
vect.foldLeft(0.0) {
_ + _
}
}
val materializedRowSums = rowSums.collect
val similarityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
(vid, vect.map {
_ / materializedRowSums(rowx)
})
}
if (logger.isDebugEnabled) {
logger.debug(s"W:\n${printMatrix(similarityRdd, nVertices, nVertices)}")
}
(similarityRdd, materializedRowSums)
}

private[mllib] def printMatrix(denseVectorRDD: RDD[LabeledPoint], i: Int, i1: Int) = {
denseVectorRDD.collect.map {
case (vid, dvect) => dvect.toArray
}.flatten
}

private[mllib] def printMatrixFromEdges(edgesRdd: EdgeRDD[_]) = {
val edgec = edgesRdd.collect
val sorted = edgec.sortWith { case (e1, e2) =>
e1.srcId < e2.srcId || (e1.srcId == e2.srcId && e1.dstId <= e2.dstId)
}

}

private[mllib] def makeNonZero(dval: Double, tol: Double = defaultDivideByZeroVal) = {
if (Math.abs(dval) < tol) {
Math.signum(dval) * tol
} else {
dval
}
}

private[mllib] def printMatrix(mat: BDM[Double]): String
= printMatrix(mat, mat.rows, mat.cols)

private[mllib] def printMatrix(mat: BDM[Double], numRows: Int, numCols: Int): String
= printMatrix(mat.toArray, numRows, numCols)

private[mllib] def printMatrix(vectors: Array[BDV[Double]]): String = {
printMatrix(vectors.map {
_.toArray
}.flatten, vectors.length, vectors.length)
}

private[mllib] def printMatrix(vect: Array[Double], numRows: Int, numCols: Int): String = {
val darr = vect
val stride = darr.length / numCols
val sb = new StringBuilder
def leftJust(s: String, len: Int) = {
" ".substring(0, len - Math.min(len, s.length)) + s
}

assert(darr.length == numRows * numCols,
s"Input array is not correct length (${darr.length}) given #rows/cols=$numRows/$numCols")
for (r <- 0 until numRows) {
for (c <- 0 until numCols) {
sb.append(leftJust(f"${darr(r * stride + c)}%.6f", 9) + " ")
}
sb.append("\n")
}
sb.toString
}

private[mllib] def printVector(dvect: BDV[Double]) = {
dvect.toArray.mkString(",")
}

}
Loading

0 comments on commit 92d4752

Please sign in to comment.