diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala index cb4d03fb56830..26a2c6da15da3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala @@ -17,7 +17,11 @@ package org.apache.spark.mllib.clustering +import org.apache.spark.mllib.linalg.Vectors + +import scala.reflect.ClassTag import scala.util.Random +import breeze.linalg.{DenseVector => BDV,DenseMatrix => BDM} /** * PICLinalg @@ -26,53 +30,72 @@ import scala.util.Random object PICLinalg { - type DVector = Array[Double] - type DMatrix = Array[DVector] + type DMatrix = BDM[Double] - type LabeledVector = (String, DVector) + type LabeledVector = (Long, BDV[Double]) - type IndexedVector = (Long, DVector) + type IndexeBDV[Double] = (Long, BDV[Double]) type Vertices = Seq[LabeledVector] - def add(v1: DVector, v2: DVector) = +// implicit def arrayToVect(darr: Array[Double]): BDV[Double] = new BDV(darr) + implicit def bdvToSeq[T](vect: BDV[T])(implicit ct: ClassTag[T]): Seq[T] = vect.toArray.toSeq + implicit def bdvToArray[T](vect: BDV[T])(implicit ct: ClassTag[T]): Array[T] = vect.toArray +// implicit def arrayToSeq(arr: Array[Double]) = Predef.doubleArrayOps(arr) + def add(v1: BDV[Double], v2: BDV[Double]) = v1.zip(v2).map { x => x._1 + x._2} - def mult(v1: DVector, d: Double) = { - v1.map { - _ * d - } + def norm(darr: Array[Double]): Double = { + Math.sqrt(darr.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)}) + } + + def norm(darr: BDV[Double]): Double = { + darr.norm(2) } - def mult(v1: DVector, v2: DVector) = { - v1.zip(v2).map { case (v1v, v2v) => v1v * v2v} +// +// +// // Implicits to convert between Breeze DenseVector's and Arrays +// implicit def arrToSeq[T](arr: Array[T]): Seq[T] = arr.toSeq +//// implicit def arrayToBDV(darr: Array[Double]): BDV[Double] +//// = Vectors.dense(darr).asInstanceOf[BDV[Double]] +//// implicit def bdvToArray[T](vect: BDV[T])(implicit ct: ClassTag[T]): Array[T] = vect.toArray +//// implicit def bdvToSeq[T](vect: BDV[T])(implicit ct: ClassTag[T]): Seq[T] = vect.toArray.toSeq +// +// def add(v1: BDV[Double], v2: BDV[Double]) = +// v1.zip(v2).map { x => x._1 + x._2} + + def mult(v1: BDV[Double], d: Double) = { + v1 * d } - def multColByRow(v1: DVector, v2: DVector) = { - val mat = for (v1v <- v1) - yield mult(v2, v1v) - // println(s"Col by Row:\n${printMatrix(mat, - // v1.length, v1.length)}") + def mult(v1: BDV[Double], v2: BDV[Double]) = { + v1 * v2 + } + + def multColByRow(v1: BDV[Double], v2: BDV[Double]) = { + val mat = v1 * v2.t mat } - def norm(vect: DVector): Double = { - Math.sqrt(vect.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)}) + def norm(vect: BDV[Double]): Double = { + vect.norm } - def manhattanNorm(vect: DVector): Double = { - val n = vect.foldLeft(0.0) { case (sum, dval) => sum + Math.abs(dval)} - n / Math.sqrt(vect.size) +// def norm(darr: Array[Double]): Double = { +// Math.sqrt(darr.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)}) +// } + + def manhattanNorm(vect: BDV[Double]): Double = { + vect.norm(1) } - def dot(v1: DVector, v2: DVector) = { - v1.zip(v2).foldLeft(0.0) { - case (sum, (b, p)) => sum + b * p - } + def dot(v1: BDV[Double], v2: BDV[Double]) : Double = { + v1.dot(v2) } - def onesVector(len: Int): DVector = { - Array.fill(len)(1.0) + def onesVector(len: Int): BDV[Double] = { + BDV.ones(len) } val calcEigenDiffs = true @@ -90,43 +113,25 @@ object PICLinalg { } def transpose(mat: DMatrix) = { - val nCols = mat(0).length - val matT = mat - .flatten - .zipWithIndex - .groupBy { - _._2 % nCols - } - .toSeq.sortBy { - _._1 - } - .map(_._2) - // .map(_.toSeq.sortBy(_._1)) - .map(_.map(_._1)) - .toArray - matT + mat.t } - def printMatrix(mat: Array[Array[Double]]): String - = printMatrix(mat, mat.length, mat.length) + def printMatrix(mat: BDM[Double]): String + = printMatrix(mat, mat.rows, mat.cols) - def printMatrix(darr: Array[DVector], numRows: Int, numCols: Int): String = { - val flattenedArr = darr.zipWithIndex.foldLeft(new DVector(numRows * numCols)) { - case (flatarr, (row, indx)) => - System.arraycopy(row, 0, flatarr, indx * numCols, numCols) - flatarr - } - printMatrix(flattenedArr, numRows, numCols) + def printMatrix(mat: BDM[Double], numRows: Int, numCols: Int): String = { + printMatrix(mat.toArray, numRows, numCols) } - def printMatrix(darr: DVector, numRows: Int, numCols: Int): String = { - val stride = (darr.length / numCols) + def printMatrix(vect: Array[Double], numRows: Int, numCols: Int): String = { + val darr = vect + val stride = darr.length / numCols val sb = new StringBuilder def leftJust(s: String, len: Int) = { " ".substring(0, len - Math.min(len, s.length)) + s } - assert(darr.size == numRows * numCols, + assert(darr.length == numRows * numCols, s"Input array is not correct length (${darr.length}) given #rows/cols=$numRows/$numCols") for (r <- 0 until numRows) { for (c <- 0 until numCols) { @@ -137,49 +142,46 @@ object PICLinalg { sb.toString } - def printVector(dvect: DVector) = { + def printVector(dvect: BDV[Double]) = { dvect.mkString(",") } - def project(basisVector: DVector, inputVect: DVector) = { + def project(basisVector: BDV[Double], inputVect: BDV[Double]) = { val pnorm = makeNonZero(norm(basisVector)) val projectedVect = basisVector.map( _ * dot(basisVector, inputVect) / dot(basisVector, basisVector)) projectedVect } - def subtract(v1: DVector, v2: DVector) = { - val subvect = v1.zip(v2).map { case (v1val, v2val) => v1val - v2val} - subvect + def subtract(v1: BDV[Double], v2: BDV[Double]) = { + v1 - v2 } - def subtractProjection(vect: DVector, basisVect: DVector): DVector = { + def subtractProjection(vect: BDV[Double], basisVect: BDV[Double]): BDV[Double] = { val proj = project(basisVect, vect) val subVect = subtract(vect, proj) subVect } def localPIC(matIn: DMatrix, nClusters: Int, nIterations: Int, - optExpected: Option[(DVector, DMatrix)]) = { + optExpected: Option[(BDV[Double], DMatrix)]) = { var mat = matIn.map(identity) - val numVects = mat.length + val numVects = mat.cols - val (expLambda, expdat) = optExpected.getOrElse((new DVector(0), new DMatrix(0))) + val (expLambda, expdat) = optExpected.getOrElse((new BDV(Array(0.0)), new BDM(0,0))) var cnorm = -1.0 for (k <- 0 until nClusters) { val r = new Random() - var eigen = Array.fill(numVects) { + var eigen = new BDV(Array.fill(numVects) { // 1.0 r.nextDouble - } + }) val enorm = norm(eigen) - eigen.map { e => e / enorm} + eigen *= 1.0 / enorm for (iter <- 0 until nIterations) { - eigen = mat.map { dvect => - dot(dvect, eigen) - } + eigen = mat * eigen cnorm = makeNonZero(norm(eigen)) eigen = eigen.map(_ / cnorm) } @@ -187,7 +189,7 @@ object PICLinalg { val lambda = dot(mat(0), eigen) / eigen(0) eigen = eigen.map(_ * signum) println(s"lambda=$lambda eigen=${printVector(eigen)}") - if (expLambda.length > 0) { + if (expLambda.toArray.length > 0) { val compareVect = eigen.zip(expdat(k)).map { case (a, b) => a / b} println(s"Ratio to expected: lambda=${lambda / expLambda(k)} " + s"Vect=${compareVect.mkString("[", ",", "]")}") @@ -204,26 +206,22 @@ object PICLinalg { } def compareMatrices(m1: DMatrix, m2: DMatrix) = { - m1.zip(m2).forall { case (m1v, m2v) => - m1v.zip(m2v).forall { case (m1vv, m2vv) => withinTol(m1vv - m2vv)} + m1.toArray.zip(m2.toArray).forall { case (m1v, m2v) => + withinTol(m1v - m2v) } } def subtract(mat1: DMatrix, mat2: DMatrix) = { - mat1.zip(mat2).map { case (m1row, m2row) => - m1row.zip(m2row).map { case (m1v, m2v) => m1v - m2v} - } + mat1 - mat2 } - def deflate(mat: DMatrix, lambda: Double, eigen: DVector) = { + def deflate(mat: DMatrix, lambda: Double, eigen: BDV[Double]) = { // mat = mat.map(subtractProjection(_, mult(eigen, lambda))) val eigT = eigen - val projected = multColByRow(eigen, eigT).map(mult(_, lambda)) + val projected = (eigen * eigen.t) * lambda // println(s"projected matrix:\n${printMatrix(projected, // eigen.length, eigen.length)}") - val matOut = mat.zip(projected).map { case (mrow, prow) => - subtract(mrow, prow) - } + val matOut = mat - projected println(s"Updated matrix:\n${ printMatrix(mat, eigen.length, eigen.length) @@ -232,46 +230,39 @@ object PICLinalg { } def mult(mat1: DMatrix, mat2: DMatrix) = { - val mat2T = transpose(mat2) - val outmatT = for {row <- mat1} - yield { - val outRow = mat2T.map { col => - dot(row, col) - } - outRow - } - outmatT + val outMat = mat1 :* mat2 + outMat } - // def mult(mat: DMatrix, vect: DVector): DMatrix = { + // def mult(mat: DMatrix, vect: BDV[Double]): DMatrix = { // val outMat = mat.map { m => // mult(m, vect) // } // outMat // } // - // def mult(vect: DVector, mat: DMatrix): DMatrix = { + // def mult(vect: BDV[Double], mat: DMatrix): DMatrix = { // for {d <- vect.zip(transpose(mat)) } // yield mult(d._2, d._1) // } def scale(mat: DMatrix, d: Double): DMatrix = { - for (row <- mat) yield mult(row, d) + mat * d } - def transpose(vector: DVector) = { + def transpose(vector: BDV[Double]) = { vector.map { d => Array(d)} } def toMat(dvect: Array[Double], ncols: Int) = { - val m = dvect.toSeq.grouped(ncols).map(_.toArray).toArray + val m = dvect.toSeq.grouped(ncols).map(_.toArray) m } - def schurComplement(mat: DMatrix, lambda: Double, eigen: DVector) = { - val eigT = toMat(eigen, eigen.length) // The sense is reversed - val eig = transpose(eigT) - val projected = mult(eig, eigT) + def schurComplement(mat: DMatrix, lambda: Double, eigen: BDV[Double]) = { + val eig = eigen + val eigT = eigen.t + val projected = eig * eigT println(s"projected matrix:\n${ printMatrix(projected, eigen.length, eigen.length) @@ -282,9 +273,9 @@ object PICLinalg { printMatrix(numerat2, eigen.length, eigen.length) }") - val denom1 = mult(eigT, mat) - val denom2 = mult(denom1, toMat(eigen, 1)) - val denom = denom2(0)(0) + val denom1 = eigT * mat + val denom2 = denom1 * eigen + val denom = denom2.toArray(0) println(s"denom is $denom") val projMat = scale(numerat2, 1.0 / denom) println(s"Updated matrix:\n${ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala index 35ce50218b09f..9fe989985cebd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala @@ -22,20 +22,30 @@ import org.apache.spark.SparkContext import org.apache.spark.graphx._ import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD +import breeze.linalg.{DenseVector => BDV} +/** + * Implements the scalable graph clustering algorithm Power Iteration Clustering (see + * www.icml2010.org/papers/387.pdf). From the abstract: + * + * The input data is first transformed to a normalized Affinity Matrix via Gaussian pairwise + * distance calculations. Power iteration is then used to find a dimensionality-reduced + * representation. The resulting pseudo-eigenvector provides effective clustering - as + * performed by Parallel KMeans. + */ object PIClustering { private val logger = Logger.getLogger(getClass.getName()) - type DVector = Array[Double] +// type BDV[Double] = BDV[Double] type DEdge = Edge[Double] - type LabeledPoint = (VertexId, DVector) + type LabeledPoint = (VertexId, BDV[Double]) type Points = Seq[LabeledPoint] type DGraph = Graph[Double, Double] - type IndexedVector = (Long, DVector) + type IndexedVector[Double] = (Long, BDV[Double]) val DefaultMinNormChange: Double = 1e-11 @@ -202,31 +212,12 @@ object PIClustering { (outG, vnorm, outG.vertices) } - def scalarDot(d1: DVector, d2: DVector) = { - Math.sqrt(d1.zip(d2).foldLeft(0.0) { case (sum, (d1v, d2v)) => - sum + d1v * d2v - }) - } - - def vectorDot(d1: DVector, d2: DVector) = { - d1.zip(d2).map { case (d1v, d2v) => - d1v * d2v - } - } - - def normVect(d1: DVector, d2: DVector) = { - val scaldot = scalarDot(d1, d2) - vectorDot(d1, d2).map { - _ / scaldot - } - } - def readVerticesfromFile(verticesFile: String): Points = { import scala.io.Source val vertices = Source.fromFile(verticesFile).getLines.map { l => val toks = l.split("\t") - val arr = toks.slice(1, toks.length).map(_.toDouble) + val arr = Vectors.dense(toks.slice(1, toks.length).map(_.toDouble)).asInstanceOf[BDV] (toks(0).toLong, arr) }.toSeq if (logger.isDebugEnabled) { @@ -235,8 +226,8 @@ object PIClustering { vertices } - def gaussianDist(c1arr: DVector, c2arr: DVector, sigma: Double) = { - val c1c2 = c1arr.zip(c2arr) + def gaussianDist(c1arr: BDV[Double], c2arr: BDV[Double], sigma: Double) = { + val c1c2 = c1arr.toArray.zip(c2arr.toArray) val dist = Math.exp((-0.5 / Math.pow(sigma, 2.0)) * c1c2.foldLeft(0.0) { case (dist: Double, (c1: Double, c2: Double)) => dist + Math.pow(c1 - c2, 2) @@ -244,11 +235,11 @@ object PIClustering { dist } - def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector], + def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector[Double]], minAffinity: Double = DefaultMinAffinity) = { val labels = wRdd.map { case (vid, vect) => vid}.collect val edgesRdd = wRdd.flatMap { case (vid, vect) => - for ((dval, ix) <- vect.zipWithIndex + for ((dval, ix) <- vect.toArray.zipWithIndex if Math.abs(dval) >= minAffinity) yield Edge(vid, labels(ix), dval) } @@ -258,9 +249,9 @@ object PIClustering { def createNormalizedAffinityMatrix(sc: SparkContext, points: Points, sigma: Double) = { val nVertices = points.length val affinityRddNotNorm = sc.parallelize({ - val ivect = new Array[IndexedVector](nVertices) + val ivect = new Array[IndexedVector[Double]](nVertices) for (i <- 0 until points.size) { - ivect(i) = new IndexedVector(points(i)._1, new DVector(nVertices)) + ivect(i) = new IndexedVector(points(i)._1, BDV(nVertices)) for (j <- 0 until points.size) { val dist = if (i != j) { gaussianDist(points(i)._2, points(j)._2, sigma) @@ -276,8 +267,7 @@ object PIClustering { }, nVertices) if (logger.isDebugEnabled) { logger.debug(s"Affinity:\n${ - LA.printMatrix(affinityRddNotNorm.collect.map(_._2._2), - nVertices, nVertices) + RDDLA.printMatrix(affinityRddNotNorm.map_._2), nVertices, nVertices) }") } val rowSums = affinityRddNotNorm.map { case (ix, (vid, vect)) => @@ -292,17 +282,17 @@ object PIClustering { }) } if (logger.isDebugEnabled) { - logger.debug(s"W:\n${LA.printMatrix(similarityRdd.collect.map(_._2), nVertices, nVertices)}") + logger.debug(s"W:\n${RDDLA.printMatrix(similarityRdd, nVertices, nVertices)}") } (similarityRdd, materializedRowSums) } - def norm(vect: DVector): Double = { + def norm(vect: BDV[Double]): Double = { Math.sqrt(vect.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)}) } - def printMatrix(darr: Array[DVector], numRows: Int, numCols: Int): String = { - val flattenedArr = darr.zipWithIndex.foldLeft(new DVector(numRows * numCols)) { + def printMatrix(darr: Array[BDV[Double]], numRows: Int, numCols: Int): String = { + val flattenedArr = darr.zipWithIndex.foldLeft(BDV[Double](numRows * numCols)) { case (flatarr, (row, indx)) => System.arraycopy(row, 0, flatarr, indx * numCols, numCols) flatarr @@ -310,7 +300,7 @@ object PIClustering { printMatrix(flattenedArr, numRows, numCols) } - def printMatrix(darr: DVector, numRows: Int, numCols: Int): String = { + def printMatrix(darr: BDV[Double], numRows: Int, numCols: Int): String = { val stride = (darr.length / numCols) val sb = new StringBuilder def leftJust(s: String, len: Int) = { @@ -326,8 +316,8 @@ object PIClustering { sb.toString } - def printVect(dvect: DVector) = { - dvect.mkString(",") + def printVect(dvect: BDV[Double]) = { + dvect.toArray.mkString(",") } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala index 85eb80481dfcd..99a366d7240f6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/RDDLinalg.scala @@ -19,23 +19,25 @@ package org.apache.spark.mllib.clustering import org.apache.spark.graphx.{EdgeRDD, Edge, VertexId} import org.apache.spark.mllib.clustering.{PICLinalg => LA} +import breeze.linalg.{DenseVector => BDV} import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkContext} import org.apache.spark.mllib.clustering.PICLinalg._ import scala.util.Random object RDDLinalg { + val DefaultMinNormAccel: Double = 1e-11 val DefaultIterations: Int = 20 val calcEigenDiffs = true def getPrincipalEigen(sc: SparkContext, - vectRdd: RDD[IndexedVector], + vectRdd: RDD[IndexeBDV[Double]], rddSize: Option[Int] = None, nIterations: Int = DefaultIterations, minNormAccel: Double = DefaultMinNormAccel - ): (Double, DVector) = { + ): (Double, BDV[Double]) = { vectRdd.cache() val rowMajorRdd = vectRdd.map(identity) // Linalg.transpose(vectRdd) @@ -64,7 +66,7 @@ object RDDLinalg { eigenRdd = rowMajorRdd.mapPartitions { piter => val localEigenRdd = bcEigenRdd.value piter.map { case (ix, dvect) => - val d = LA.dot(dvect, localEigenRdd.map(_._2).toArray) + val d = LA.dot(dvect, new BDV(localEigenRdd.map(_._2).toArray)) println(s"localEigenRdd($ix)=$d (from ${dvect.mkString(",")} " + s" and ${localEigenRdd.map(_._2).mkString(",")})") (ix, d) @@ -97,14 +99,14 @@ object RDDLinalg { } vectRdd.unpersist() - // val darr = new DVector(numVects) + // val darr = new BDV[Double](numVects) val eigenVect = eigenRddCollected.map(_._2).toArray val pdiff = eigenRddCollectedPrior.zip(eigenVect).foldLeft(0.0) { case (sum, (e1v, e2v)) => sum + Math.abs(e1v - e2v) } assert(LA.withinTol(pdiff), s"Why is the prior eigenValue not nearly equal to present one: diff=$pdiff") - val lambda = LA.dot(vectRdd.take(1)(0)._2, eigenVect) / eigenVect(0) + val lambda = LA.dot(vectRdd.take(1)(0)._2, new BDV(eigenVect)) / eigenVect(0) // assert(withinTol(lambdaRatio - 1.0), // "According to A *X = lambda * X we should have (A *X / X) ratio = lambda " + // s"but that did not happen: instead ratio=$lambdaRatio") @@ -112,10 +114,10 @@ object RDDLinalg { // println(s"eigenRdd: ${collectedEigenRdd.mkString(",")}") // System.arraycopy(collectedEigenRdd, 0, darr, 0, darr.length) // (cnorm, darr) - (lambda, eigenVect) + (lambda, new BDV(eigenVect)) } - def transpose(indexedRdd: RDD[IndexedVector]) = { + def transpose(indexedRdd: RDD[IndexeBDV[Double]]) = { val nVertices = indexedRdd.count.toInt val ColsPartitioner = new Partitioner() { override def numPartitions: Int = nVertices @@ -139,8 +141,8 @@ object RDDLinalg { columnsRdd } - def subtractProjection(sc: SparkContext, vectorsRdd: RDD[IndexedVector], vect: DVector): - RDD[IndexedVector] = { + def subtractProjection(sc: SparkContext, vectorsRdd: RDD[IndexeBDV[Double]], vect: BDV[Double]): + RDD[IndexeBDV[Double]] = { val bcVect = sc.broadcast(vect) val subVectRdd = vectorsRdd.mapPartitions { iter => val localVect = bcVect.value @@ -161,17 +163,16 @@ object RDDLinalg { val printInputMatrix: Boolean = false - def eigens(sc: SparkContext, matrixRdd: RDD[IndexedVector], nClusters: Int, + def eigens(sc: SparkContext, matrixRdd: RDD[IndexeBDV[Double]], nClusters: Int, nPowerIterations: Int) = { val lambdas = new Array[Double](nClusters) val eigens = new Array[RDD[Array[Double]]](nClusters) var deflatedRdd = matrixRdd.map(identity) // Clone the original matrix val nVertices = deflatedRdd.count.toInt if (printInputMatrix) { - val collectedMatrixRdd = matrixRdd.collect +// val collectedMatrixRdd = matrixRdd.collect println(s"Degrees Matrix:\n${ - LA.printMatrix(collectedMatrixRdd.map(_._2), - nVertices, nVertices) + printMatrix(matrixRdd, nVertices, nVertices) }") } for (ex <- 0 until nClusters) { @@ -182,11 +183,9 @@ object RDDLinalg { deflatedRdd = subtractProjection(sc, deflatedRdd, eigen) // deflatedRdd = sc.parallelize(deflatedRddCollected, nVertices) if (printDeflatedRdd) { - val deflatedRddCollected = deflatedRdd.collect +// val deflatedRddCollected = deflatedRdd.collect println(s"EigensRemovedRDDCollected=\n${ - LA.printMatrix(deflatedRddCollected.map { - _._2 - }, nVertices, nVertices) + printMatrix(deflatedRdd, nVertices, nVertices) }") } val arrarr = new Array[Array[Double]](1) @@ -204,6 +203,11 @@ object RDDLinalg { vertices.map { case (vid, dval) => s"($vid,$dval)"}.mkString(" , ") } + def printMatrix(denseVectorRDD: RDD[LabeledVector], i: Int, i1: Int) = { + denseVectorRDD.collect.map{ + case (vid, dvect) => dvect.toArray}.flatten + } + def printMatrixFromEdges(edgesRdd: EdgeRDD[_]) = { val edgec = edgesRdd.collect // assert(edgec.size < 1e3,"Let us not print a large graph") diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala index 1fbef314e18a2..969e31abea5bb 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala @@ -19,35 +19,10 @@ package org.apache.spark.mllib.clustering import org.apache.spark.graphx._ import org.apache.spark.mllib.clustering.PICLinalg.DMatrix -import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.FunSuite import scala.util.Random - -/** - * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped - * after each test. - * TODO: import this from the graphx test cases package i.e. may need update to pom.xml - */ -trait LocalSparkContext { - /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ - def withSpark[T](f: SparkContext => T) = { - val conf = new SparkConf() - GraphXUtils.registerKryoClasses(conf) - val sc = new SparkContext("local", "test", conf) - try { - f(sc) - } finally { - sc.stop() - } - } -} - -/** - * SpectralClusteringWithGraphxSuite - * - */ class PIClusteringSuite extends FunSuite with LocalSparkContext { import org.apache.spark.mllib.clustering.PIClusteringSuite._ @@ -95,6 +70,7 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { def irisData() = { import org.apache.spark.mllib.linalg._ + import scala.io.Source val irisRaw = Source.fromFile("data/mllib/iris.data").getLines.map(_.split(",")) val iter: Iterator[(Array[Double], String)] = irisRaw.map { toks => (toks.slice(0, toks.length - 1).map { @@ -116,11 +92,6 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { } - def saveToMatplotLib(dmat: DMatrix, optLegend: Option[Array[String]], optLabels: Option[Array[Array[String]]]) = { - import breeze.plot._ - - } - test("graphxSingleEigen") { graphxSingleEigen } @@ -196,8 +167,6 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { // val dr = new DRange(0.0, 5.0) // val polyInfo = A(PS(3.0, 2.0, -1.0) // val noiseRatio = 0.1 - // val l = List(1,2,3) - // l.scanLeft( // } } @@ -241,14 +210,6 @@ object PIClusteringSuite { } def printPoints(points: Seq[Point]) = { - // val sorted = points.sortWith { case (p1, p2) => - // if (LA.withinTol(p1.y-p2.y)) { - // p1.x <= p2.x - // } else { - // p1.y >= p2.y - // } - // } - // sorted.mkString("["," , ","]") points.mkString("[", " , ", "]") } @@ -260,7 +221,7 @@ object PIClusteringSuite { A(0.9, .5, .75, 0) ) println(s"Input mat: ${LA.printMatrix(dat1, 4, 4)}") - val D = /*LA.transpose(dat1)*/ dat1.zipWithIndex.map { case (dvect, ix) => + val D = dat1.zipWithIndex.map { case (dvect, ix) => val sum = dvect.foldLeft(0.0) { _ + _ } @@ -279,6 +240,10 @@ object PIClusteringSuite { (dat1, DxDat1) } + def saveToMatplotLib(dmat: DMatrix, optLegend: Option[Array[String]], optLabels: Option[Array[Array[String]]]) = { + + } + def main(args: Array[String]) { val pictest = new PIClusteringSuite pictest.concentricCirclesTest()