diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala index 8e42db23c6f0c..76ef43f972e5b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala @@ -1,8 +1,248 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.spark.mllib.clustering +import org.apache.spark.graphx +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD /** - * Created by fan on 1/22/15. + * Power Iteration Clustering + * */ -class PIClustering { - -} +object PIClustering { + type DVector = Array[Double] + type DEdge = Edge[Double] + type LabeledPoint = (VertexId, DVector) + type Points = Seq[LabeledPoint] + type DGraph = Graph[Double, Double] + type IndexedVector = (Long, DVector) + val DefaultMinNormChange: Double = 1e-11 + val DefaultSigma = 1.0 + val DefaultIterations: Int = 20 + val DefaultMinAffinity = 1e-11 + val LA = SpectralClusteringUsingRdd.Linalg + def cluster(sc: SparkContext, + points: Points, + nClusters: Int, + nIterations: Int = DefaultIterations, + sigma: Double = DefaultSigma, + minAffinity: Double = DefaultMinAffinity) = { + val nVertices = points.length + val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma) + val initialVt = createInitialVector(sc, points.map(_._1), rowSums) + val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity) + val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt)) + getPrincipalEigen(sc, G) + } + /* + vnorm[0]=2.019968019268192 + Updating vertex[0] from 0.2592592592592593 to 0.2597973189724011 + Updating vertex[1] from 0.19753086419753088 to 0.1695805301675885 + Updating vertex[3] from 0.2654320987654321 to 0.27258531045499795 + Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227 + */ + def createInitialVector(sc: SparkContext, + labels: Seq[VertexId], + rowSums: Seq[Double]) = { + val volume = rowSums.fold(0.0) { + _ + _ + } + val initialVt = labels.zip(rowSums.map(_ / volume)) + initialVt + } + def createGraphFromEdges(sc: SparkContext, + edgesRdd: RDD[DEdge], + nPoints: Int, + optInitialVt: Option[Seq[(VertexId, Double)]] = None) = { + assert(nPoints > 0, "Must provide number of points from the original dataset") + val G = if (optInitialVt.isDefined) { + val initialVt = optInitialVt.get + val vertsRdd = sc.parallelize(initialVt) + Graph(vertsRdd, edgesRdd) + } else { + Graph.fromEdges(edgesRdd, -1.0) + } + G + } + val printMatrices = true + def getPrincipalEigen(sc: SparkContext, + G: DGraph, + nIterations: Int = DefaultIterations, + optMinNormChange: Option[Double] = None + ): (DGraph, Double, DVector) = { + var priorNorm = Double.MaxValue + var norm = Double.MaxValue + var priorNormVelocity = Double.MaxValue + var normVelocity = Double.MaxValue + var normAccel = Double.MaxValue + val DummyVertexId = -99L + var vnorm: Double = -1.0 + var outG: DGraph = null + var prevG: DGraph = G + val epsilon = optMinNormChange + .getOrElse(1e-5 / G.vertices.count()) + for (iter <- 0 until nIterations + if Math.abs(normAccel) > epsilon) { + val tmpEigen = prevG.aggregateMessages[Double](ctx => { + ctx.sendToSrc(ctx.attr * ctx.srcAttr); + ctx.sendToDst(ctx.attr * ctx.dstAttr) + }, + _ + _) + println(s"tmpEigen[$iter]: ${tmpEigen.collect.mkString(",")}\n") + val vnorm = + prevG.vertices.map{ _._2}.fold(0.0) { case (sum, dval) => + sum + Math.abs(dval) + } + println(s"vnorm[$iter]=$vnorm") + outG = prevG.outerJoinVertices(tmpEigen) { case (vid, wval, optTmpEigJ) => + val normedEig = optTmpEigJ.getOrElse { + println("We got null estimated eigenvector element"); + -1.0 + } / vnorm + println(s"Updating vertex[$vid] from $wval to $normedEig") + normedEig + } + prevG = outG + if (printMatrices) { + val localVertices = outG.vertices.collect + val graphSize = localVertices.size + print(s"Vertices[$iter]: ${localVertices.mkString(",")}\n") + } + normVelocity = vnorm - priorNorm + normAccel = normVelocity - priorNormVelocity + println(s"normAccel[$iter]= $normAccel") + priorNorm = vnorm + priorNormVelocity = vnorm - priorNorm + } + (outG, vnorm, outG.vertices.collect.map { + _._2 + }) + } + // def printGraph(G: DGraph) = { + // val collectedVerts = G.vertices.collect + // val nVertices = collectedVerts.length + // val msg = s"Graph Vertices:\n${printMatrix(collectedVerts, nVertices, nVertices)}" + // } + // + def scalarDot(d1: DVector, d2: DVector) = { + Math.sqrt(d1.zip(d2).foldLeft(0.0) { case (sum, (d1v, d2v)) => + sum + d1v * d2v + }) + } + def vectorDot(d1: DVector, d2: DVector) = { + d1.zip(d2).map { case (d1v, d2v) => + d1v * d2v + } + } + def normVect(d1: DVector, d2: DVector) = { + val scaldot = scalarDot(d1, d2) + vectorDot(d1, d2).map { + _ / scaldot + } + } + def readVerticesfromFile(verticesFile: String): Points = { + import scala.io.Source + val vertices = Source.fromFile(verticesFile).getLines.map { l => + val toks = l.split("\t") + val arr = toks.slice(1, toks.length).map(_.toDouble) + (toks(0).toLong, arr) + }.toSeq + println(s"Read in ${vertices.length} from $verticesFile") + // println(vertices.map { case (x, arr) => s"($x,${arr.mkString(",")})"} + // .mkString("[", ",\n", "]")) + vertices + } + def gaussianDist(c1arr: DVector, c2arr: DVector, sigma: Double) = { + val c1c2 = c1arr.zip(c2arr) + val dist = Math.exp((0.5 / Math.pow(sigma, 2.0)) * c1c2.foldLeft(0.0) { + case (dist: Double, (c1: Double, c2: Double)) => + dist + Math.pow(c1 - c2, 2) + }) + dist + } + def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector], + minAffinity: Double = DefaultMinAffinity) = { + val labels = wRdd.map { case (vid, vect) => vid}.collect + val edgesRdd = wRdd.flatMap { case (vid, vect) => + for ((dval, ix) <- vect.zipWithIndex + if Math.abs(dval) >= minAffinity) + yield Edge(vid, labels(ix), dval) + } + edgesRdd + } + def createNormalizedAffinityMatrix(sc: SparkContext, points: Points, sigma: Double) = { + val nVertices = points.length + val rowSums = for (bcx <- 0 until nVertices) + yield sc.accumulator[Double](bcx, s"ColCounts$bcx") + val affinityRddNotNorm = sc.parallelize({ + val ivect = new Array[IndexedVector](nVertices) + var rsum = 0.0 + for (i <- 0 until points.size) { + ivect(i) = new IndexedVector(points(i)._1, new DVector(nVertices)) + for (j <- 0 until points.size) { + val dist = if (i != j) { + gaussianDist(points(i)._2, points(j)._2, sigma) + } else { + 0.0 + } + ivect(i)._2(j) = dist + rsum += dist + } + rowSums(i) += rsum + } + ivect.zipWithIndex.map { case (vect, ix) => + (ix, vect) + } + }, nVertices) + val affinityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) => + (vid, vect.map { + _ / rowSums(rowx).value + }) + } + (affinityRdd, rowSums.map { + _.value + }) + } + def norm(vect: DVector): Double = { + Math.sqrt(vect.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)}) + } + def printMatrix(darr: Array[DVector], numRows: Int, numCols: Int): String = { + val flattenedArr = darr.zipWithIndex.foldLeft(new DVector(numRows * numCols)) { + case (flatarr, (row, indx)) => + System.arraycopy(row, 0, flatarr, indx * numCols, numCols) + flatarr + } + printMatrix(flattenedArr, numRows, numCols) + } + def printMatrix(darr: DVector, numRows: Int, numCols: Int): String = { + val stride = (darr.length / numCols) + val sb = new StringBuilder + def leftJust(s: String, len: Int) = { + " ".substring(0, len - Math.min(len, s.length)) + s + } + for (r <- 0 until numRows) { + for (c <- 0 until numCols) { + sb.append(leftJust(f"${darr(c * stride + r)}%.6f", 9) + " ") + } + sb.append("\n") + } + sb.toString + } + def printVect(dvect: DVector) = { + dvect.mkString(",") + } +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala new file mode 100644 index 0000000000000..9847a4b37fd3b --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala @@ -0,0 +1,268 @@ +package org.apache.spark.mllib.clustering +import org.scalatest.FunSuite +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +/** + * PIClusteringSuite + * + */ +class PIClusteringSuite extends FunSuite with LocalSparkContext { + val SP = SpectralClusteringUsingRdd + val LA = SP.Linalg + val A = Array + test("graphxSingleEigen") { + val dat1 = A( + A(0, 0.4, .8, 0.9), + A(.4, 0, .7, 0.5), + A(0.8, 0.7, 0, 0.75), + A(0.9, .5, .75, 0) + ) + val D = /*LA.transpose(dat1)*/dat1.zipWithIndex.map { case (dvect, ix) => + val sum = dvect.foldLeft(0.0) { + _ + _ + } + dvect.zipWithIndex.map { case (d, dx) => + if (ix == dx) { + 1.0 / sum + } else { + 0.0 + } + } + } + print(s"D =\n ${LA.printMatrix(D)}") + val DxDat1 = LA.mult(D, dat1) + print(s"D * Dat1 =\n ${LA.printMatrix(DxDat1)}") + // val dats = Array((dat2, expDat2), (dat1, expDat1)) + val SC = SpectralClustering + val sigma = 1.0 + val nIterations = 20 + val nClusters = 3 + withSpark { sc => + val affinityRdd = sc.parallelize(DxDat1.zipWithIndex).map { case (dvect, ix) => + (ix.toLong, dvect) + } + val wCollected = affinityRdd.collect + val nVertices = dat1(0).length + println(s"AffinityMatrix:\n${printMatrix(wCollected.map(_._2), nVertices, nVertices)}") + val edgesRdd = SC.createSparseEdgesRdd(sc, affinityRdd) + val edgesRddCollected = edgesRdd.collect() + println(s"edges=${edgesRddCollected.mkString(",")}") + val rowSums = dat1.map { vect => + vect.foldLeft(0.0){ _ + _ } + } + val initialVt = SC.createInitialVector(sc, affinityRdd.map{_._1}.collect, rowSums ) + val G = SC.createGraphFromEdges(sc, edgesRdd, nVertices, Some(initialVt)) + val printVtVectors: Boolean = true + if (printVtVectors) { + val vCollected = G.vertices.collect() + val graphInitialVt = vCollected.map { + _._2 + } + println(s" initialVT vertices: ${LA.printVertices(initialVt.toArray)}") + println(s"graphInitialVt vertices: ${LA.printVertices(vCollected)}") + val initialVtVect = initialVt.map { + _._2 + }.toArray + println(s"graphInitialVt=${graphInitialVt.mkString(",")}\n" + + s"initialVt=${initialVt.mkString(",")}") + assert(LA.compareVectors(graphInitialVt, initialVtVect)) + } + val (g2, norm, eigvect) = SC.getPrincipalEigen(sc, G, nIterations) + println(s"lambda=$norm eigvect=${eigvect.mkString(",")}") + } + } + test("VectorArithmeticAndProjections") { + // def A[T : ClassTag](ts: T*) = Array(ts:_*) + // type A = Array[Double] + var dat = A( + A(1.0, 2.0, 3.0), + A(3.0, 6.0, 9.0) + ) + var firstEigen = LA.subtractProjection(dat(0), dat(1)) // collinear + println(s"firstEigen should be all 0's ${firstEigen.mkString(",")}") + assert(firstEigen.forall(LA.withinTol(_, 1e-8))) + dat = A( + A(1.0, 2.0, 3.0), + A(-3.0, -6.0, -9.0), + A(2.0, 4.0, 6.0), + A(-1.0, 1.0, -.33333333333333), + A(-2.0, 5.0, 2.0) + ) + var proj = LA.subtractProjection(dat(0), dat(3)) // orthog + println(s"proj after subtracting orthogonal vector should be same " + + s"as input (1.,2.,3.) ${proj.mkString(",")}") + assert(proj.zip(dat(0)).map { case (a, b) => a - b} .forall(LA.withinTol(_, 1e-11))) + val addMultVect = LA.add(dat(0), LA.mult(dat(3), 3.0)) + assert(addMultVect.zip(dat(4)).map { case (a, b) => a - b} .forall(LA.withinTol(_, 1e-11)), + "AddMult was not within tolerance") + proj = LA.subtractProjection(addMultVect, dat(3)) // orthog + println(s"proj should be same as parallel input (1.,2.,3.) ${proj.mkString(",")}") + assert(proj.zip(dat(0)).map { case (a, b) => a - b} .forall(LA.withinTol(_, 1e-11))) + } + test("eigensTest") { + var dat0 = toMat(A(2., 1.5, 2, .5, 3, .5, 1., .5, 4.), 3) + val expDat0 = (A(-0.7438459, -0.4947461, -0.4493547), + toMat(A(0.5533067, 0.3680148, 0.3342507, + 0.3680148, 0.2447737, 0.2223165, 0.3342507, 0.2223165, 0.2019197), 3) + ) + val dat1 = A( + A(3.0, 2.0, 4.0), + A(2.0, 0.0, 2.0), + A(4.0, 2.0, 3.0) + ) + val expDat1 = + (A(8.0, -1.0, -1.0), + A( + A(0.6666667, 0.7453560, 0.0000000), + A(0.3333333, -0.2981424, -0.8944272), + A(0.6666667, -0.5962848, 0.4472136) + )) + val dat2 = A( + A(1.0, 1, -2.0), + A(-1.0, 2.0, 1.0), + A(0.0, 1.0, -1.0) + ) + // val dat2 = A( + // A(1.0, -1.0, 0.0), + // A(1.0, 2.0, 1.0), + // A(-2.0, 1.0, -1.0) + // ) + val expDat2 = + (A(2.0, -1.0, 1.0), + A( + A(-0.5773503, -0.1360828, -7.071068e-01), + A(0.5773503, -0.2721655, -3.188873e-16), + A(0.5773503, 0.9525793, 7.071068e-01) + )) + val dats = Array((dat0, expDat0)) + // val dats = Array((dat2, expDat2), (dat1, expDat1)) + for ((dat, expdat) <- dats) { + val sigma = 1.0 + val nIterations = 10 // 20 + val nClusters = 3 + withSpark { sc => + var datRdd = LA.transpose( + sc.parallelize((0 until dat.length).zip(dat).map { case (ix, dvect) => + (ix, dvect) + }.toSeq)) + val datRddCollected = datRdd.collect() + val (eigvals, eigvects) = LA.eigens(sc, datRdd, nClusters, nIterations) + val collectedEigens = eigvects.collect + val printedEigens = LA.printMatrix(collectedEigens, 3, 3) + println(s"eigvals=${eigvals.mkString(",")} eigvects=\n$printedEigens}") + assert(LA.compareVectors(eigvals, expdat._1)) + assert(LA.compareMatrices(eigvects.collect, expdat._2)) + } + } + } + test("matrix mult") { + val m1 = toMat(A(1., 2., 3., 4., 5., 6., 7., 8., 9.), 3) + val m2 = toMat(A(3., 1., 2., 3., 4., 5., 6., 7., 8.), 3) + val mprod = LA.mult(m1, m2) + println(s"Matrix1:\n ${printMatrix(mprod, 3, 3)}") + val m21 = toMat(A(1., 2., 3., 4., 5., 6., 7., 8., 9.), 3) + val m22 = toMat(A(10., 11., 12., 13., 14., 15.), 2) + val mprod2 = LA.mult(m21, m22) + println(s"Matrix2:\n ${printMatrix(mprod2, 3, 3)}") + } + test("positiveEigenValues") { + var dat2r = toMat(A(2., 1.5, 2, .5, 3, .5, 1., .5, 4.), 3) + val expLambda = A(5.087874, 2.810807, 1.101319) + val expdat = LA.transpose(toMat(A(0.6196451, -0.2171220, 0.9402498, 0.3200200, -0.8211316, -0.1699035, 0.7166779, 0.5278267, -0.2950646), 3)) + val nClusters = 3 + val nIterations = 20 + val numVects = 3 + LA.localPIC(dat2r, nClusters, nIterations, Some((expLambda, expdat))) + } + def toMat(dvect: Array[Double], ncols: Int) = { + val m = dvect.toSeq.grouped(ncols).map(_.toArray).toArray + m + } + test("positiveEigenValuesTaketwo") { + val dat2r = toMat(A(2., 1.5, 2, .5, 3, .5, 1., .5, 2.), 3) + val expLambda = A(4.2058717, 2.2358331, 0.5582952) + val expdat = LA.transpose(toMat(A(-0.7438459, 0.4718612, -0.82938843, + -0.4947461, -0.6777315, 0.05601231, -0.4493547, 0.5639389, 0.55585740), 3)) + val nClusters = 3 + val nIterations = 20 + val numVects = 3 + LA.localPIC(dat2r, nClusters, nIterations, Some((expLambda, expdat))) + } + test("manualPowerIt") { + // R code + // + //> x = t(matrix(c(-2,-.5,2,.5,-3,.5,-1.,.5,4),nrow=3,ncol=3)) + //> x + // [,1] [,2] [,3] + //[1,] -2.0 -0.5 2.0 + //[2,] 0.5 -3.0 0.5 + //[3,] -1.0 0.5 4.0 + // + //> eigen(x) + //$values + //[1] 3.708394 -2.639960 -2.068434 + // + //$vectors + // [,1] [,2] [,3] + //[1,] 0.32182428 0.56847491 -0.85380536 + //[2,] 0.09420476 0.82235947 -0.51117379 + //[3,] 0.94210116 0.02368917 -0.09857872 + // + //x = t(matrix(c(-2.384081, 0.387542, -2.124275, -0.612458, -3.032928, 0.170814, 0.875725, 0.170814, 0.709039 ),nrow=3,ncol=3)) + //> eigen(x) + //$values + //[1] -2.6400564672 -2.0684340334 0.0005205006 + // + //$vectors + // [,1] [,2] [,3] + //[1,] -0.5192257 0.8053757 0.6396646 + //[2,] 0.8496238 -0.5503942 -0.1713433 + //[3,] 0.0924343 -0.2200823 -0.7493135 + // var dat2r = A( + // A(-2.0, -0.5, 2.0), + // A(0.5, -3.0, 0.5), + // A(-0.5, 0.5, 4.0) + // ) + var dat2r = toMat(A(-2., -.5, 2, .5, -3, .5, -1., .5, 4.), 3) + val expLambda = A(3.708394, -2.639960, -2.068434) + val expdat = LA.transpose(toMat(A(0.32182428, 0.56847491, -0.85380536, 0.09420476, + 0.82235947, -0.51117379, 0.94210116, 0.02368917, -0.09857872), 3)) + val nClusters = 3 + val nIterations = 30 + val numVects = 3 + LA.localPIC(dat2r, nClusters, nIterations, Some((expLambda, expdat))) + } + test("fifteenVerticesTest") { + val vertFile = "../data/graphx/new_lr_data.15.txt" + val sigma = 1.0 + val nIterations = 20 + val nClusters = 3 + withSpark { sc => + val vertices = SpectralClusteringUsingRdd.readVerticesfromFile(vertFile) + val nVertices = vertices.length + val (lambdas, eigens) = SpectralClusteringUsingRdd.cluster(sc, vertices, nClusters, sigma, nIterations) + val collectedRdd = eigens.collect + println(s"DegreeMatrix:\n${printMatrix(collectedRdd, nVertices, nVertices)}") + val collectedEigens = eigens.collect + println(s"Eigenvalues = ${lambdas.mkString(",")} EigenVectors:\n${printMatrix(collectedEigens, nClusters, nVertices)}") + } + } + def printMatrix(darr: Array[Double], numRows: Int, numCols: Int): String = + LA.printMatrix(darr, numRows, numCols) + def printMatrix(darr: Array[Array[Double]], numRows: Int, numCols: Int): String = + LA.printMatrix(darr, numRows, numCols) +} \ No newline at end of file