diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala index baf22a7b81ad5..cb4d03fb56830 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala @@ -137,7 +137,7 @@ object PICLinalg { sb.toString } - def printVect(dvect: DVector) = { + def printVector(dvect: DVector) = { dvect.mkString(",") } @@ -186,7 +186,7 @@ object PICLinalg { val signum = Math.signum(dot(mat(0), eigen)) val lambda = dot(mat(0), eigen) / eigen(0) eigen = eigen.map(_ * signum) - println(s"lambda=$lambda eigen=${printVect(eigen)}") + println(s"lambda=$lambda eigen=${printVector(eigen)}") if (expLambda.length > 0) { val compareVect = eigen.zip(expdat(k)).map { case (a, b) => a / b} println(s"Ratio to expected: lambda=${lambda / expLambda(k)} " + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala index d0724b4d42534..ed3f4c955e257 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala @@ -22,10 +22,6 @@ import org.apache.spark.graphx._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD -/** - * SpectralClusteringWithGraphx - * - */ object PIClustering { type DVector = Array[Double] @@ -46,6 +42,7 @@ object PIClustering { val DefaultMinAffinity = 1e-11 val LA = PICLinalg + val RDDLA = RDDLinalg def cluster(sc: SparkContext, points: Points, @@ -57,10 +54,11 @@ object PIClustering { val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma) val initialVt = createInitialVector(sc, points.map(_._1), rowSums) + println(s"Vt(0)=${LA.printVector(initialVt.map{_._2}.toArray)}") val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity) - val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt)) - val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G) + println(RDDLA.printMatrixFromEdges(G.edges)) + val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations) // TODO: avoid local collect and then sc.parallelize. val localVect = vt.map{Vectors.dense(_)} val vectRdd = sc.parallelize(localVect) @@ -68,15 +66,6 @@ object PIClustering { (model, gUpdated, lambda, vt) } - /* - -vnorm[0]=2.019968019268192 -Updating vertex[0] from 0.2592592592592593 to 0.2597973189724011 -Updating vertex[1] from 0.19753086419753088 to 0.1695805301675885 -Updating vertex[3] from 0.2654320987654321 to 0.27258531045499795 -Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227 - - */ def createInitialVector(sc: SparkContext, labels: Seq[VertexId], rowSums: Seq[Double]) = { @@ -163,12 +152,6 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227 }) } - // def printGraph(G: DGraph) = { - // val collectedVerts = G.vertices.collect - // val nVertices = collectedVerts.length - // val msg = s"Graph Vertices:\n${printMatrix(collectedVerts, nVertices, nVertices)}" - // } - // def scalarDot(d1: DVector, d2: DVector) = { Math.sqrt(d1.zip(d2).foldLeft(0.0) { case (sum, (d1v, d2v)) => sum + d1v * d2v @@ -224,11 +207,8 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227 def createNormalizedAffinityMatrix(sc: SparkContext, points: Points, sigma: Double) = { val nVertices = points.length - val rowSums = for (bcx <- 0 until nVertices) - yield sc.accumulator[Double](bcx, s"ColCounts$bcx") val affinityRddNotNorm = sc.parallelize({ val ivect = new Array[IndexedVector](nVertices) - var rsum = 0.0 for (i <- 0 until points.size) { ivect(i) = new IndexedVector(points(i)._1, new DVector(nVertices)) for (j <- 0 until points.size) { @@ -238,9 +218,7 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227 0.0 } ivect(i)._2(j) = dist - rsum += dist } - rowSums(i) += rsum } ivect.zipWithIndex.map { case (vect, ix) => (ix, vect) @@ -248,14 +226,17 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227 }, nVertices) println(s"Affinity:\n${LA.printMatrix(affinityRddNotNorm.collect.map(_._2._2), nVertices, nVertices)}") - val materializedRowSums = rowSums.map{ _.value} - val affinityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) => + val rowSums = affinityRddNotNorm.map { case (ix, (vid, vect)) => + vect.foldLeft(0.0){ _ + _} + } + val materializedRowSums = rowSums.collect + val similarityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) => (vid, vect.map { _ / materializedRowSums(rowx) }) } - println(s"W:\n${LA.printMatrix(affinityRdd.collect.map(_._2), nVertices, nVertices)}") - (affinityRdd, materializedRowSums) + println(s"W:\n${LA.printMatrix(similarityRdd.collect.map(_._2), nVertices, nVertices)}") + (similarityRdd, materializedRowSums) } def norm(vect: DVector): Double = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala index 92e0c5ac430c2..17301584bcbde 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.clustering -import org.apache.spark.graphx.VertexId +import org.apache.spark.graphx.{EdgeRDD, Edge, VertexId} import org.apache.spark.mllib.clustering.{PICLinalg => LA} import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkContext} @@ -204,4 +204,12 @@ object RDDLinalg { vertices.map { case (vid, dval) => s"($vid,$dval)"}.mkString(" , ") } + def printMatrixFromEdges(edgesRdd: EdgeRDD[_]) = { + val edgec = edgesRdd.collect + assert(edgec.size < 1e3,"Let us not print a large graph") + val sorted = edgec.sortWith { case (e1, e2) => + e1.srcId < e2.srcId || (e1.srcId == e2.srcId && e1.dstId <= e2.dstId) + } + + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala index b79a6d8343531..71de2d585c2f5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala @@ -125,7 +125,7 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { } def concentricCirclesTest() = { val sigma = 1.0 - val nIterations = 50 + val nIterations = 10 val nClusters = 3 val circleSpecs = Seq( // Best results for 30 points