diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala similarity index 87% rename from mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala rename to mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index afc4161c99f93..8ea2e727cfa96 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -35,7 +35,7 @@ import scala.language.existentials * representation. The resulting pseudo-eigenvector provides effective clustering - as * performed by Parallel KMeans. */ -object PIClustering { +object PowerIterationClustering { private val logger = Logger.getLogger(getClass.getName()) @@ -44,32 +44,32 @@ object PIClustering { type DGraph = Graph[Double, Double] type IndexedVector[Double] = (Long, BDV[Double]) + // Terminate iteration when norm changes by less than this value - private[mllib] val DefaultMinNormChange: Double = 1e-11 + private[mllib] val defaultMinNormChange: Double = 1e-11 - // Default σ for Gaussian Distance calculations - private[mllib] val DefaultSigma = 1.0 + // Default sigma for Gaussian Distance calculations + private[mllib] val defaultSigma = 1.0 // Default number of iterations for PIC loop - private[mllib] val DefaultIterations: Int = 20 + private[mllib] val defaultIterations: Int = 20 // Default minimum affinity between points - lower than this it is considered // zero and no edge will be created - private[mllib] val DefaultMinAffinity = 1e-11 + private[mllib] val defaultMinAffinity = 1e-11 // Do not allow divide by zero: change to this value instead - val DefaultDivideByZeroVal: Double = 1e-15 + val defaultDivideByZeroVal: Double = 1e-15 // Default number of runs by the KMeans.run() method - val DefaultKMeansRuns = 10 + val defaultKMeansRuns = 10 /** * * Run a Power Iteration Clustering * * @param sc Spark Context - * @param points Input Points in format of [(VertexId,(x,y)] - * where VertexId is a Long + * @param G Affinity Matrix in a Sparse Graph structure * @param nClusters Number of clusters to create * @param nIterations Number of iterations of the PIC algorithm * that calculates primary PseudoEigenvector and Eigenvalue @@ -83,30 +83,13 @@ object PIClustering { * Seq[(VertexId, ClusterID Membership)] */ def run(sc: SparkContext, - points: Points, + G: Graph[Double, Double], nClusters: Int, - nIterations: Int = DefaultIterations, - sigma: Double = DefaultSigma, - minAffinity: Double = DefaultMinAffinity, - nRuns: Int = DefaultKMeansRuns) + nIterations: Int = defaultIterations, + sigma: Double = defaultSigma, + minAffinity: Double = defaultMinAffinity, + nRuns: Int = defaultKMeansRuns) : (Seq[(Int, Vector)], Seq[((VertexId, Vector), Int)]) = { - val vidsRdd = sc.parallelize(points.map(_._1).sorted) - val nVertices = points.length - - val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma) - val initialVt = createInitialVector(sc, points.map(_._1), rowSums) - if (logger.isDebugEnabled) { - logger.debug(s"Vt(0)=${ - printVector(new BDV(initialVt.map { - _._2 - }.toArray)) - }") - } - val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity) - val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt)) - if (logger.isDebugEnabled) { - logger.debug(printMatrixFromEdges(G.edges)) - } val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations) // TODO: avoid local collect and then sc.parallelize. val localVt = vt.collect.sortBy(_._1) @@ -140,36 +123,43 @@ object PIClustering { } /** - * Read Points from an input file in the following format: - * Vertex1Id Coord11 Coord12 CoordX13 .. Coord1D - * Vertex2Id Coord21 Coord22 CoordX23 .. Coord2D - * .. - * VertexNId CoordN1 CoordN2 CoordN23 .. CoordND - * - * Where N is the number of observations, each a D-dimension point * - * E.g. + * Create an affinity matrix * - * 19 1.8035177495 0.7460582552 0.2361611395 -0.8645567427 -0.8613062 - * 10 0.5534111111 1.0456386879 1.7045663273 0.7281759816 1.0807487792 - * 911 1.200749626 1.8962364439 2.5117192131 -0.4034737281 -0.9069696484 - * - * Which represents three 5-dimensional input Points with VertexIds 19,10, and 911 - * @param verticesFile Local filesystem path to the Points input file - * @return Set of Vertices in format appropriate for consumption by the PIC algorithm + * @param sc Spark Context + * @param points Input Points in format of [(VertexId,(x,y)] + * where VertexId is a Long + * @param sigma Sigma for Gaussian distribution calculation according to + * [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2 + * @param minAffinity Minimum Affinity between two Points in the input dataset: below + * this threshold the affinity will be considered "close to" zero and + * no Edge will be created between those Points in the sparse matrix + * @return Tuple of (Seq[(Cluster Id,Cluster Center)], + * Seq[(VertexId, ClusterID Membership)] */ - def readVerticesfromFile(verticesFile: String): Points = { - - import scala.io.Source - val vertices = Source.fromFile(verticesFile).getLines.map { l => - val toks = l.split("\t") - val arr = new BDV(toks.slice(1, toks.length).map(_.toDouble)) - (toks(0).toLong, arr) - }.toSeq + def createGaussianAffinityMatrix(sc: SparkContext, + points: Points, + sigma: Double = defaultSigma, + minAffinity: Double = defaultMinAffinity) + : Graph[Double, Double] = { + val vidsRdd = sc.parallelize(points.map(_._1).sorted) + val nVertices = points.length + + val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma) + val initialVt = createInitialVector(sc, points.map(_._1), rowSums) + if (logger.isDebugEnabled) { + logger.debug(s"Vt(0)=${ + printVector(new BDV(initialVt.map { + _._2 + }.toArray)) + }") + } + val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity) + val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt)) if (logger.isDebugEnabled) { - logger.debug(s"Read in ${vertices.length} from $verticesFile") + logger.debug(printMatrixFromEdges(G.edges)) } - vertices + G } /** @@ -205,7 +195,7 @@ object PIClustering { */ def getPrincipalEigen(sc: SparkContext, G: DGraph, - nIterations: Int = DefaultIterations, + nIterations: Int = defaultIterations, optMinNormChange: Option[Double] = None ): (DGraph, Double, VertexRDD[Double]) = { @@ -312,7 +302,7 @@ object PIClustering { * @return */ private[mllib] def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector[Double]], - minAffinity: Double = DefaultMinAffinity) = { + minAffinity: Double = defaultMinAffinity) = { val labels = wRdd.map { case (vid, vect) => vid}.collect val edgesRdd = wRdd.flatMap { case (vid, vect) => for ((dval, ix) <- vect.toArray.zipWithIndex @@ -387,7 +377,7 @@ object PIClustering { } - private[mllib] def makeNonZero(dval: Double, tol: Double = DefaultDivideByZeroVal) = { + private[mllib] def makeNonZero(dval: Double, tol: Double = defaultDivideByZeroVal) = { if (Math.abs(dval) < tol) { Math.signum(dval) * tol } else { diff --git a/mllib/src/test/resources/log4j.mllib.properties b/mllib/src/test/resources/log4j.mllib.properties deleted file mode 100644 index 4ebcacb764b1f..0000000000000 --- a/mllib/src/test/resources/log4j.mllib.properties +++ /dev/null @@ -1,41 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=DEBUG, file, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.append=true -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.eclipse.jetty=WARN -log4j.logger.org.apache.spark.shuffle=WARN -log4j.logger.org.apache.spark.scheduler=WARN -log4j.logger.org.apache.spark.storage=WARN -log4j.logger.org.apache.spark.executor=WARN -log4j.logger.org.apache.spark.CacheManager=WARN -log4j.logger.org.apache.spark=WARN -log4j.logger.org.apache.spark.mllib=DEBUG -log4j.logger.akka=WARN -log4j.logger.org.scalatest=WARN diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala similarity index 69% rename from mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala rename to mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala index da1e4a28f2a59..e1005fb1983ac 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala @@ -19,26 +19,24 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseVector => BDV} import org.apache.log4j.Logger -import org.apache.spark.graphx._ -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.util.MLlibTestSparkContext import org.scalatest.FunSuite import scala.util.Random -class PIClusteringSuite extends FunSuite with LocalSparkContext { +class PowerIterationClusteringSuite extends FunSuite with MLlibTestSparkContext { val logger = Logger.getLogger(getClass.getName) - import org.apache.spark.mllib.clustering.PIClusteringSuite._ + import org.apache.spark.mllib.clustering.PowerIterationClusteringSuite._ - val PIC = PIClustering + val PIC = PowerIterationClustering val A = Array test("concentricCirclesTest") { concentricCirclesTest() } - def concentricCirclesTest() = { val sigma = 1.0 val nIterations = 10 @@ -55,23 +53,22 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext { val nClusters = circleSpecs.size val cdata = createConcentricCirclesData(circleSpecs) - withSpark { sc => - val vertices = new Random().shuffle(cdata.map { p => - (p.label, new BDV(Array(p.x, p.y))) - }) + val vertices = new Random().shuffle(cdata.map { p => + (p.label, new BDV(Array(p.x, p.y))) + }) - val nVertices = vertices.length - val (ccenters, estCollected) = PIC.run(sc, vertices, nClusters, nIterations) - logger.info(s"Cluster centers: ${ccenters.mkString(",")} " + - s"\nEstimates: ${estCollected.mkString("[", ",", "]")}") - assert(ccenters.size == circleSpecs.length, "Did not get correct number of centers") + val nVertices = vertices.length + val G = PIC.createGaussianAffinityMatrix(sc, vertices) + val (ccenters, estCollected) = PIC.run(sc, G, nClusters, nIterations) + logger.info(s"Cluster centers: ${ccenters.mkString(",")} " + + s"\nEstimates: ${estCollected.mkString("[", ",", "]")}") + assert(ccenters.size == circleSpecs.length, "Did not get correct number of centers") - } } } -object PIClusteringSuite { +object PowerIterationClusteringSuite { val logger = Logger.getLogger(getClass.getName) val A = Array @@ -115,26 +112,7 @@ object PIClusteringSuite { } def main(args: Array[String]) { - val pictest = new PIClusteringSuite + val pictest = new PowerIterationClusteringSuite pictest.concentricCirclesTest() } } - -/** - * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped - * after each test. - * TODO: import this from the graphx test cases package i.e. may need update to pom.xml - */ -trait LocalSparkContext { - /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ - def withSpark[T](f: SparkContext => T) = { - val conf = new SparkConf() - GraphXUtils.registerKryoClasses(conf) - val sc = new SparkContext("local", "test", conf) - try { - f(sc) - } finally { - sc.stop() - } - } -}