diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala index ed3f4c955e257..00a96438320d5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala @@ -17,13 +17,15 @@ package org.apache.spark.mllib.clustering +import org.apache.log4j.Logger import org.apache.spark.SparkContext import org.apache.spark.graphx._ -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD object PIClustering { + private val logger = Logger.getLogger(getClass.getName()) type DVector = Array[Double] type DEdge = Edge[Double] @@ -44,26 +46,73 @@ object PIClustering { val LA = PICLinalg val RDDLA = RDDLinalg - def cluster(sc: SparkContext, - points: Points, - nClusters: Int, - nIterations: Int = DefaultIterations, - sigma: Double = DefaultSigma, - minAffinity: Double = DefaultMinAffinity) = { + /** + * + * @param sc + * @param points + * @param nClusters + * @param nIterations + * @param sigma + * @param minAffinity + * @return Tuple of (Seq[(Cluster Id,Cluster Center)], + * Seq[(VertexId, ClusterID Membership)] + */ + def run(sc: SparkContext, + points: Points, + nClusters: Int, + nIterations: Int = DefaultIterations, + sigma: Double = DefaultSigma, + minAffinity: Double = DefaultMinAffinity) + : (Seq[(Int, Vector)], Seq[(VertexId, Int)]) = { + val vidsRdd = sc.parallelize(points.map(_._1).sorted) val nVertices = points.length val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma) val initialVt = createInitialVector(sc, points.map(_._1), rowSums) - println(s"Vt(0)=${LA.printVector(initialVt.map{_._2}.toArray)}") + if (logger.isDebugEnabled) { + logger.debug(s"Vt(0)=${ + LA.printVector(initialVt.map { + _._2 + }.toArray) + }") + } val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity) val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt)) - println(RDDLA.printMatrixFromEdges(G.edges)) + if (logger.isDebugEnabled) { + logger.debug(RDDLA.printMatrixFromEdges(G.edges)) + } val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations) // TODO: avoid local collect and then sc.parallelize. - val localVect = vt.map{Vectors.dense(_)} - val vectRdd = sc.parallelize(localVect) - val model = KMeans.train(vectRdd, 3, 10) - (model, gUpdated, lambda, vt) + val localVt = vt.collect.sortBy(_._1).map(_._2) + val vectRdd = sc.parallelize(localVt.map(Vectors.dense(_))) + // TODO: what to set nRuns + val nRuns = 10 + vectRdd.cache() + val model = KMeans.train(vectRdd, nClusters, nRuns) + vectRdd.unpersist() + if (logger.isDebugEnabled) { + logger.debug(s"Eigenvalue = $lambda EigenVector: ${localVt.mkString(",")}") + } + val estimates = vidsRdd.zip(model.predict(sc.parallelize(localVt.map { + Vectors.dense(_) + }))) + if (logger.isDebugEnabled) { + logger.debug(s"lambda=$lambda eigen=${localVt.mkString(",")}") + } + val ccs = (0 until model.clusterCenters.length).zip(model.clusterCenters) + if (logger.isDebugEnabled) { + logger.debug(s"Kmeans model cluster centers: ${ccs.mkString(",")}") + } + val pointsMap = Map(points: _*) + val estCollected = estimates.collect.sortBy(_._1) + if (logger.isDebugEnabled) { + // val clusters = estCollected.map(_._2) + // logger.debug(s"Cluster Estimates: ${estCollected.mkString(",")} " + // val counts = Map(estCollected:_*).groupBy(_._1).mapValues(_.size) + // + s" Counts: ${counts.mkString(",")}") + logger.debug(s"Cluster Estimates: ${estCollected.mkString(",")}") + } + (ccs, estCollected) } def createInitialVector(sc: SparkContext, @@ -93,13 +142,11 @@ object PIClustering { } - val printMatrices = true - def getPrincipalEigen(sc: SparkContext, G: DGraph, nIterations: Int = DefaultIterations, optMinNormChange: Option[Double] = None - ): (DGraph, Double, DVector) = { + ): (DGraph, Double, VertexRDD[Double]) = { var priorNorm = Double.MaxValue var norm = Double.MaxValue @@ -120,36 +167,43 @@ object PIClustering { ctx.sendToDst(ctx.attr * ctx.dstAttr) }, _ + _) - println(s"tmpEigen[$iter]: ${tmpEigen.collect.mkString(",")}\n") + if (logger.isDebugEnabled) { + logger.debug(s"tmpEigen[$iter]: ${tmpEigen.collect.mkString(",")}\n") + } val vnorm = - prevG.vertices.map{ _._2}.fold(0.0) { case (sum, dval) => + prevG.vertices.map { + _._2 + }.fold(0.0) { case (sum, dval) => sum + Math.abs(dval) } - println(s"vnorm[$iter]=$vnorm") + if (logger.isDebugEnabled) { + logger.debug(s"vnorm[$iter]=$vnorm") + } outG = prevG.outerJoinVertices(tmpEigen) { case (vid, wval, optTmpEigJ) => val normedEig = optTmpEigJ.getOrElse { - println("We got null estimated eigenvector element"); -1.0 } / vnorm - println(s"Updating vertex[$vid] from $wval to $normedEig") + if (logger.isDebugEnabled) { + logger.debug(s"Updating vertex[$vid] from $wval to $normedEig") + } normedEig } prevG = outG - if (printMatrices) { + if (logger.isDebugEnabled) { val localVertices = outG.vertices.collect val graphSize = localVertices.size print(s"Vertices[$iter]: ${localVertices.mkString(",")}\n") } normVelocity = vnorm - priorNorm normAccel = normVelocity - priorNormVelocity - println(s"normAccel[$iter]= $normAccel") + if (logger.isDebugEnabled) { + logger.debug(s"normAccel[$iter]= $normAccel") + } priorNorm = vnorm priorNormVelocity = vnorm - priorNorm } - (outG, vnorm, outG.vertices.collect.map { - _._2 - }) + (outG, vnorm, outG.vertices) } def scalarDot(d1: DVector, d2: DVector) = { @@ -179,9 +233,9 @@ object PIClustering { val arr = toks.slice(1, toks.length).map(_.toDouble) (toks(0).toLong, arr) }.toSeq - println(s"Read in ${vertices.length} from $verticesFile") - // println(vertices.map { case (x, arr) => s"($x,${arr.mkString(",")})"} - // .mkString("[", ",\n", "]")) + if (logger.isDebugEnabled) { + logger.debug(s"Read in ${vertices.length} from $verticesFile") + } vertices } @@ -224,10 +278,16 @@ object PIClustering { (ix, vect) } }, nVertices) - println(s"Affinity:\n${LA.printMatrix(affinityRddNotNorm.collect.map(_._2._2), - nVertices, nVertices)}") + if (logger.isDebugEnabled) { + logger.debug(s"Affinity:\n${ + LA.printMatrix(affinityRddNotNorm.collect.map(_._2._2), + nVertices, nVertices) + }") + } val rowSums = affinityRddNotNorm.map { case (ix, (vid, vect)) => - vect.foldLeft(0.0){ _ + _} + vect.foldLeft(0.0) { + _ + _ + } } val materializedRowSums = rowSums.collect val similarityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) => @@ -235,7 +295,9 @@ object PIClustering { _ / materializedRowSums(rowx) }) } - println(s"W:\n${LA.printMatrix(similarityRdd.collect.map(_._2), nVertices, nVertices)}") + if (logger.isDebugEnabled) { + logger.debug(s"W:\n${LA.printMatrix(similarityRdd.collect.map(_._2), nVertices, nVertices)}") + } (similarityRdd, materializedRowSums) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala index 17301584bcbde..85eb80481dfcd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala @@ -206,7 +206,7 @@ object RDDLinalg { def printMatrixFromEdges(edgesRdd: EdgeRDD[_]) = { val edgec = edgesRdd.collect - assert(edgec.size < 1e3,"Let us not print a large graph") +// assert(edgec.size < 1e3,"Let us not print a large graph") val sorted = edgec.sortWith { case (e1, e2) => e1.srcId < e2.srcId || (e1.srcId == e2.srcId && e1.dstId <= e2.dstId) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala index 71de2d585c2f5..e354aa0bf662a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala @@ -22,6 +22,8 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx._ import org.scalatest.FunSuite +import scala.util.Random + /** * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped @@ -96,7 +98,7 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { // assert(LA.compareVectors(graphInitialVt, initialVtVect)) } val (g2, norm, eigvect) = PIC.getPrincipalEigen(sc, G, nIterations) - println(s"lambda=$norm eigvect=${eigvect.mkString(",")}") + println(s"lambda=$norm eigvect=${eigvect.collect.mkString(",")}") } } @@ -111,12 +113,11 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { withSpark { sc => val vertices = PIC.readVerticesfromFile(vertFile) val nVertices = vertices.length - val (model, graph, lambda, eigen) = PIC.cluster(sc, vertices, nClusters, + val (ccenters, estimates) = PIC.run(sc, vertices, nClusters, nIterations, sigma) - val collectedRdd = eigen // .collect - println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}") - println(s"Eigenvalue = $lambda EigenVectors:\n${LA.printMatrix(collectedRdd, nClusters, nVertices)}") -// println(s"Eigenvalues = ${lambdas.mkString(",")} EigenVectors:\n${printMatrix(collectedEigens, nClusters, nVertices)}") +// val collectedRdd = eigen.collect.map{_._2} +// println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}") +// println(s"Eigenvalue = $lambda EigenVectors:\n${LA.printMatrix(collectedRdd, nClusters, nVertices)}") } } @@ -125,13 +126,14 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { } def concentricCirclesTest() = { val sigma = 1.0 - val nIterations = 10 - val nClusters = 3 + val nIterations = 20 val circleSpecs = Seq( // Best results for 30 points CircleSpec(Point(0.0,0.0), 0.03, .1, 3), CircleSpec(Point(0.0,0.0), 0.3, .03, 12), - CircleSpec(Point(0.0,0.0), 1.0, .01, 15) + CircleSpec(Point(0.0,0.0), 1.0, .01, 15), + CircleSpec(Point(0.0,0.0), 1.5, .005, 25), + CircleSpec(Point(0.0,0.0), 2.0, .002, 40) // DECENT // CircleSpec(Point(0.0,0.0), 0.1, .1, 5), @@ -143,22 +145,17 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { // CircleSpec(Point(0.0,0.0), 1.0, .03, 25), // CircleSpec(Point(0.0,0.0), 2.5, .01, 60) ) + val nClusters = circleSpecs.size + val cdata = createConcentricCirclesData(circleSpecs) withSpark { sc => - val vertices = createConcentricCirclesData(circleSpecs).zipWithIndex.map { case (p, ix) => - (ix.toLong, Array(p.x, p.y)) - } + val vertices = new Random().shuffle(cdata.map { p => + (p.label, Array(p.x, p.y)) + }) val nVertices = vertices.length - val (model, graph, lambda, eigen) = PIC.cluster(sc, vertices, nClusters, - nIterations, sigma) - val collectedRdd = eigen // .collect -// println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}") - println(s"Eigenvalue = $lambda EigenVector: ${collectedRdd.mkString(",")}") - val estimates = model.predict(sc.parallelize(eigen.map{Vectors.dense(_)})) - println(s"lambda=$lambda eigen=${eigen.mkString(",")}") - println(s"Kmeans model cluster centers: ${model.clusterCenters.mkString(",")}") -// println(s"Eigenvalues = ${lambdas.mkString(",")} EigenVectors:\n${printMatrix(collectedEigens, nClusters, nVertices)}") - println(s"Cluster Estimates=:${estimates.collect.mkString(",")}") + val (ccenters, estCollected) = PIC.run(sc, vertices, nClusters, nIterations) + println(s"Cluster centers: ${ccenters.mkString(",")} " + + s"Estimates: ${estCollected.mkString(",")}") } } @@ -184,37 +181,45 @@ object PIClusteringSuite { def pdoub(d: Double) = f"$d%1.6f" - case class Point(x: Double, y: Double) { - override def toString() = s"(${pdoub(x)},${pdoub(y)})" + case class Point(label: Long, x: Double, y: Double) { + def this(x: Double, y: Double) = this(-1L, x, y) + override def toString() = s"($label, (${pdoub(x)},${pdoub(y)}))" + } + object Point { + def apply(x: Double, y: Double) = new Point(-1L, x, y) } + case class CircleSpec(center: Point, radius: Double, noiseToRadiusRatio: Double, nPoints: Int, uniformDistOnCircle: Boolean = true) def createConcentricCirclesData(circleSpecs: Seq[CircleSpec]) = { import org.apache.spark.mllib.random.StandardNormalGenerator val normalGen = new StandardNormalGenerator + var idStart = 0 val circles = for (csp <- circleSpecs) yield { + idStart += 1000 val circlePoints = for (thetax <- 0 until csp.nPoints) yield { val theta = thetax * 2 * Math.PI / csp.nPoints val (x,y) = ( csp.radius * Math.cos(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio), csp.radius * Math.sin(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio)) - Point(x,y) + (Point(idStart+thetax, x,y)) } circlePoints } - val points = circles.flatten + val points = circles.flatten.sortBy(_.label) println(printPoints(points)) points } def printPoints(points: Seq[Point]) = { - val sorted = points.sortWith { case (p1, p2) => - if (LA.withinTol(p1.y-p2.y)) { - p1.x <= p2.x - } else { - p1.y >= p2.y - } - } - sorted.mkString("["," , ","]") +// val sorted = points.sortWith { case (p1, p2) => +// if (LA.withinTol(p1.y-p2.y)) { +// p1.x <= p2.x +// } else { +// p1.y >= p2.y +// } +// } +// sorted.mkString("["," , ","]") + points.mkString("["," , ","]") } def createAffinityMatrix() = {