diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index ef3890962494d..2a0f8c11d0a50 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -29,7 +29,7 @@ private[spark] object BLAS extends Serializable { @transient private var _nativeBLAS: NetlibBLAS = _ // For level-1 routines, we use Java implementation. - private def f2jBLAS: NetlibBLAS = { + private[ml] def f2jBLAS: NetlibBLAS = { if (_f2jBLAS == null) { _f2jBLAS = new F2jBLAS } diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index d626f04599670..0955d3e6e1f8f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -35,6 +35,7 @@ import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, SparkContex import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.linalg.BLAS import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -363,7 +364,7 @@ class ALSModel private[ml] ( * relatively efficient, the approach implemented here is significantly more efficient. * * This approach groups factors into blocks and computes the top-k elements per block, - * using a simple dot product (instead of gemm) and an efficient [[BoundedPriorityQueue]]. + * using dot product and an efficient [[BoundedPriorityQueue]] (instead of gemm). * It then computes the global top-k by aggregating the per block top-k elements with * a [[TopByKeyAggregator]]. This significantly reduces the size of intermediate and shuffle data. * This is the DataFrame equivalent to the approach used in @@ -393,31 +394,18 @@ class ALSModel private[ml] ( val m = srcIter.size val n = math.min(dstIter.size, num) val output = new Array[(Int, Int, Float)](m * n) - var j = 0 + var i = 0 val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) srcIter.foreach { case (srcId, srcFactor) => dstIter.foreach { case (dstId, dstFactor) => - /* - * The below code is equivalent to - * `val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)` - * This handwritten version is as or more efficient as BLAS calls in this case. - */ - var score = 0.0f - var k = 0 - while (k < rank) { - score += srcFactor(k) * dstFactor(k) - k += 1 - } + // We use F2jBLAS which is faster than a call to native BLAS for vector dot product + val score = BLAS.f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1) pq += dstId -> score } - val pqIter = pq.iterator - var i = 0 - while (i < n) { - val (dstId, score) = pqIter.next() - output(j + i) = (srcId, dstId, score) + pq.foreach { case (dstId, score) => + output(i) = (srcId, dstId, score) i += 1 } - j += n pq.clear() } output.toSeq diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index 0cd68a633c0b5..cb97742245689 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -31,7 +31,7 @@ private[spark] object BLAS extends Serializable with Logging { @transient private var _nativeBLAS: NetlibBLAS = _ // For level-1 routines, we use Java implementation. - private def f2jBLAS: NetlibBLAS = { + private[mllib] def f2jBLAS: NetlibBLAS = { if (_f2jBLAS == null) { _f2jBLAS = new F2jBLAS } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index d45866c016d91..ac709ad72f0c0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -20,8 +20,6 @@ package org.apache.spark.mllib.recommendation import java.io.IOException import java.lang.{Integer => JavaInteger} -import scala.collection.mutable - import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.hadoop.fs.Path @@ -33,7 +31,7 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} import org.apache.spark.internal.Logging -import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.BLAS import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD @@ -263,6 +261,19 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { /** * Makes recommendations for all users (or products). + * + * Note: the previous approach used for computing top-k recommendations aimed to group + * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could + * be used for efficiency. However, this causes excessive GC pressure due to the large + * arrays required for intermediate result storage, as well as a high sensitivity to the + * block size used. + * + * The following approach still groups factors into blocks, but instead computes the + * top-k elements per block, using dot product and an efficient [[BoundedPriorityQueue]] + * (instead of gemm). This avoids any large intermediate data structures and results + * in significantly reduced GC pressure as well as shuffle data, which far outweighs + * any cost incurred from not using Level 3 BLAS operations. + * * @param rank rank * @param srcFeatures src features to receive recommendations * @param dstFeatures dst features used to make recommendations @@ -277,46 +288,22 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { num: Int): RDD[(Int, Array[(Int, Double)])] = { val srcBlocks = blockify(srcFeatures) val dstBlocks = blockify(dstFeatures) - /** - * The previous approach used for computing top-k recommendations aimed to group - * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could - * be used for efficiency. However, this causes excessive GC pressure due to the large - * arrays required for intermediate result storage, as well as a high sensitivity to the - * block size used. - * The following approach still groups factors into blocks, but instead computes the - * top-k elements per block, using a simple dot product (instead of gemm) and an efficient - * [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results - * in significantly reduced GC pressure as well as shuffle data, which far outweighs - * any cost incurred from not using Level 3 BLAS operations. - */ val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => val m = srcIter.size val n = math.min(dstIter.size, num) val output = new Array[(Int, (Int, Double))](m * n) - var j = 0 + var i = 0 val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) srcIter.foreach { case (srcId, srcFactor) => dstIter.foreach { case (dstId, dstFactor) => - /* - * The below code is equivalent to - * `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)` - * This handwritten version is as or more efficient as BLAS calls in this case. - */ - var score: Double = 0 - var k = 0 - while (k < rank) { - score += srcFactor(k) * dstFactor(k) - k += 1 - } + // We use F2jBLAS which is faster than a call to native BLAS for vector dot product + val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) pq += dstId -> score } - val pqIter = pq.iterator - var i = 0 - while (i < n) { - output(j + i) = (srcId, pqIter.next()) + pq.foreach { case (dstId, score) => + output(i) = (srcId, (dstId, score)) i += 1 } - j += n pq.clear() } output.toSeq