Skip to content

Commit

Permalink
Added ConcentricCircles data generation and KMeans clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
sboeschhuawei committed Jan 23, 2015
1 parent 3fd5bc8 commit 0ef163f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ object PICLinalg {
" ".substring(0, len - Math.min(len, s.length)) + s
}

assert(darr.size == numRows * numCols,
s"Input array is not correct length (${darr.length}) given #rows/cols=$numRows/$numCols")
for (r <- 0 until numRows) {
for (c <- 0 until numCols) {
sb.append(leftJust(f"${darr(r * stride + c)}%.6f", 9) + " ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.mllib.clustering

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD

/**
Expand Down Expand Up @@ -59,7 +60,12 @@ object PIClustering {
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)

val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
getPrincipalEigen(sc, G)
val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G)
// TODO: avoid local collect and then sc.parallelize.
val localVect = vt.map{Vectors.dense(_)}
val vectRdd = sc.parallelize(localVect)
val model = KMeans.train(vectRdd, 3, 10)
(model, gUpdated, lambda, vt)
}

/*
Expand Down Expand Up @@ -219,7 +225,7 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227
def createNormalizedAffinityMatrix(sc: SparkContext, points: Points, sigma: Double) = {
val nVertices = points.length
val rowSums = for (bcx <- 0 until nVertices)
yield sc.accumulator[Double](bcx, s"ColCounts$bcx")
yield sc.accumulator[Double](bcx, s"ColCounts$bcx")
val affinityRddNotNorm = sc.parallelize({
val ivect = new Array[IndexedVector](nVertices)
var rsum = 0.0
Expand All @@ -240,14 +246,13 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227
(ix, vect)
}
}, nVertices)
val materializedRowSums = rowSums.map{ _.value}
val affinityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
(vid, vect.map {
_ / rowSums(rowx).value
_ / materializedRowSums(rowx)
})
}
(affinityRdd, rowSums.map {
_.value
})
(affinityRdd, materializedRowSums)
}

def norm(vect: DVector): Double = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.mllib.clustering

import org.apache.spark.graphx._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.scalatest.FunSuite


Expand Down Expand Up @@ -56,7 +57,7 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
graphxSingleEigen
}

import PIClusteringSuite._
import org.apache.spark.mllib.clustering.PIClusteringSuite._
def graphxSingleEigen() = {

val (aMat, dxDat1) = createAffinityMatrix()
Expand Down Expand Up @@ -100,22 +101,59 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
}

test("fifteenVerticesTest") {
fifteenVerticesTest()
}
def fifteenVerticesTest() = {
val vertFile = "../data/graphx/new_lr_data.15.txt"
val sigma = 1.0
val nIterations = 20
val nClusters = 3
withSpark { sc =>
val vertices = PIC.readVerticesfromFile(vertFile)
val nVertices = vertices.length
val (graph, lambda, eigen) = PIC.cluster(sc, vertices, nClusters,
val (model, graph, lambda, eigen) = PIC.cluster(sc, vertices, nClusters,
nIterations, sigma)
val collectedRdd = eigen // .collect
println(s"DegreeMatrix:\n${printMatrix(collectedRdd, nVertices, nVertices)}")
println(s"Eigenvalue = $lambda EigenVectors:\n${printMatrix(collectedRdd, nClusters, nVertices)}")
println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}")
println(s"Eigenvalue = $lambda EigenVectors:\n${LA.printMatrix(collectedRdd, nClusters, nVertices)}")
// println(s"Eigenvalues = ${lambdas.mkString(",")} EigenVectors:\n${printMatrix(collectedEigens, nClusters, nVertices)}")
}
}

test("concentricCirclesTest") {
concentricCirclesTest()
}
def concentricCirclesTest() = {
val sigma = 1.0
val nIterations = 20
val nClusters = 3
val circleSpecs = Seq(
CircleSpec(Point(0.0,0.0), 0.2, .1, 4),
CircleSpec(Point(0.0,0.0), 1.0, .1, 8),
CircleSpec(Point(0.0,0.0), 2.0, .1, 16)
// CircleSpec(Point(0.0,0.0), 0.2, .1, 5),
// CircleSpec(Point(0.0,0.0), 1.0, .1, 15),
// CircleSpec(Point(0.0,0.0), 2.0, .1, 30)
)
withSpark { sc =>
val vertices = createConcentricCirclesData(circleSpecs).zipWithIndex.map { case (p, ix) =>
(ix.toLong, Array(p.x, p.y))
}

val nVertices = vertices.length
val (model, graph, lambda, eigen) = PIC.cluster(sc, vertices, nClusters,
nIterations, sigma)
val collectedRdd = eigen // .collect
// println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}")
println(s"Eigenvalue = $lambda EigenVector: ${collectedRdd.mkString(",")}")
val estimates = model.predict(sc.parallelize(eigen.map{Vectors.dense(_)}))
println(s"lambda=$lambda eigen=${eigen.mkString(",")}")
println(s"Kmeans model cluster centers: ${model.clusterCenters.mkString(",")}")
// println(s"Eigenvalues = ${lambdas.mkString(",")} EigenVectors:\n${printMatrix(collectedEigens, nClusters, nVertices)}")
println(s"Cluster Estimates=:${estimates.collect.mkString(",")}")
}
}

// test("testLinearFnGenerator") {
// val PS = PolySpec
// val dr = new DRange(0.0, 5.0)
Expand All @@ -136,9 +174,37 @@ object PIClusteringSuite {
val LA = PICLinalg
val A = Array

def toMat(dvect: Array[Double], ncols: Int) = {
val m = dvect.toSeq.grouped(ncols).map(_.toArray).toArray
m
case class Point(x: Double, y: Double) {
override def toString() = s"($x,$y)"
}
case class CircleSpec(center: Point, radius: Double, noiseToRadiusRatio: Double,
nPoints: Int, uniformDistOnCircle: Boolean = true)
def createConcentricCirclesData(circleSpecs: Seq[CircleSpec]) = {
import org.apache.spark.mllib.random.StandardNormalGenerator
val normalGen = new StandardNormalGenerator
val circles = for (csp <- circleSpecs) yield {
val circlePoints = for (thetax <- 0 until csp.nPoints) yield {
val theta = thetax * 2 * Math.PI / csp.nPoints
val (x,y) = ( csp.radius * Math.cos(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio),
csp.radius * Math.sin(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio))
Point(x,y)
}
circlePoints
}
val points = circles.flatten
printPoints(points)
points
}

def printPoints(points: Seq[Point]) = {
val sorted = points.sortWith { case (p1, p2) =>
if (p1.y == p2.y) {
p1.x <= p2.x
} else {
p1.y >= p2.y
}
}
sorted.mkString("["," , ","]")
}

def createAffinityMatrix() = {
Expand Down Expand Up @@ -182,6 +248,8 @@ object PIClusteringSuite {

def main(args: Array[String]) {
val pictest = new PIClusteringSuite
pictest.graphxSingleEigen()
// pictest.graphxSingleEigen()
// pictest.fifteenVerticesTest()
pictest.concentricCirclesTest()
}
}

0 comments on commit 0ef163f

Please sign in to comment.