Skip to content

Commit

Permalink
[SPARK-20790][MLLIB] Correctly handle negative values for implicit fe…
Browse files Browse the repository at this point in the history
…edback in ALS

## What changes were proposed in this pull request?

Revert the handling of negative values in ALS with implicit feedback, so that the confidence is the absolute value of the rating and the preference is 0 for negative ratings. This was the original behavior.

## How was this patch tested?

This patch was tested with the existing unit tests and an added unit test to ensure that negative ratings are not ignored.

mengxr

Author: David Eis <deis@bloomberg.net>

Closes #18022 from davideis/bugfix/negative-rating.
  • Loading branch information
David Eis authored and srowen committed May 31, 2017
1 parent beed5e2 commit d52f636
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 10 deletions.
22 changes: 13 additions & 9 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -763,11 +763,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
/**
* Representing a normal equation to solve the following weighted least squares problem:
*
* minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - b,,i,,)^2^ + lambda * x^T^ x.
* minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - d,,i,,)^2^ + lambda * x^T^ x.
*
* Its normal equation is given by
*
* \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - b,,i,, a,,i,,) + lambda * x = 0.
* \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - d,,i,, a,,i,,) + lambda * x = 0.
*
* Distributing and letting b,,i,, = c,,i,, * d,,i,,
*
* \sum,,i,, c,,i,, a,,i,, a,,i,,^T^ x - b,,i,, a,,i,, + lambda * x = 0.
*/
private[recommendation] class NormalEquation(val k: Int) extends Serializable {

Expand Down Expand Up @@ -796,7 +800,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
copyToDouble(a)
blas.dspr(upper, k, c, da, 1, ata)
if (b != 0.0) {
blas.daxpy(k, c * b, da, 1, atb, 1)
blas.daxpy(k, b, da, 1, atb, 1)
}
this
}
Expand Down Expand Up @@ -1624,15 +1628,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
val srcFactor = sortedSrcFactors(blockId)(localIndex)
val rating = ratings(i)
if (implicitPrefs) {
// Extension to the original paper to handle b < 0. confidence is a function of |b|
// instead so that it is never negative. c1 is confidence - 1.0.
// Extension to the original paper to handle rating < 0. confidence is a function
// of |rating| instead so that it is never negative. c1 is confidence - 1.
val c1 = alpha * math.abs(rating)
// For rating <= 0, the corresponding preference is 0. So the term below is only added
// for rating > 0. Because YtY is already added, we need to adjust the scaling here.
if (rating > 0) {
// For rating <= 0, the corresponding preference is 0. So the second argument of add
// is only there for rating > 0.
if (rating > 0.0) {
numExplicits += 1
ls.add(srcFactor, (c1 + 1.0) / c1, c1)
}
ls.add(srcFactor, if (rating > 0.0) 1.0 + c1 else 0.0, c1)
} else {
ls.add(srcFactor, rating)
numExplicits += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.ml.recommendation.ALS._
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.recommendation.MatrixFactorizationModelSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted}
Expand Down Expand Up @@ -78,7 +79,7 @@ class ALSSuite
val k = 2
val ne0 = new NormalEquation(k)
.add(Array(1.0f, 2.0f), 3.0)
.add(Array(4.0f, 5.0f), 6.0, 2.0) // weighted
.add(Array(4.0f, 5.0f), 12.0, 2.0) // weighted
assert(ne0.k === k)
assert(ne0.triK === k * (k + 1) / 2)
// NumPy code that computes the expected values:
Expand Down Expand Up @@ -347,6 +348,37 @@ class ALSSuite
ALSSuite.genFactors(size, rank, random, a, b)
}

/**
* Train ALS using the given training set and parameters
* @param training training dataset
* @param rank rank of the matrix factorization
* @param maxIter max number of iterations
* @param regParam regularization constant
* @param implicitPrefs whether to use implicit preference
* @param numUserBlocks number of user blocks
* @param numItemBlocks number of item blocks
* @return a trained ALSModel
*/
def trainALS(
training: RDD[Rating[Int]],
rank: Int,
maxIter: Int,
regParam: Double,
implicitPrefs: Boolean = false,
numUserBlocks: Int = 2,
numItemBlocks: Int = 3): ALSModel = {
val spark = this.spark
import spark.implicits._
val als = new ALS()
.setRank(rank)
.setRegParam(regParam)
.setImplicitPrefs(implicitPrefs)
.setNumUserBlocks(numUserBlocks)
.setNumItemBlocks(numItemBlocks)
.setSeed(0)
als.fit(training.toDF())
}

/**
* Test ALS using the given training/test splits and parameters.
* @param training training dataset
Expand Down Expand Up @@ -455,6 +487,22 @@ class ALSSuite
targetRMSE = 0.3)
}

test("implicit feedback regression") {
val trainingWithNeg = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, -3)))
val trainingWithZero = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, 0)))
val modelWithNeg =
trainALS(trainingWithNeg, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true)
val modelWithZero =
trainALS(trainingWithZero, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true)
val userFactorsNeg = modelWithNeg.userFactors
val itemFactorsNeg = modelWithNeg.itemFactors
val userFactorsZero = modelWithZero.userFactors
val itemFactorsZero = modelWithZero.itemFactors
userFactorsNeg.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" ")))
userFactorsZero.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" ")))
assert(userFactorsNeg.intersect(userFactorsZero).count() == 0)
assert(itemFactorsNeg.intersect(itemFactorsZero).count() == 0)
}
test("using generic ID types") {
val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01)

Expand Down

0 comments on commit d52f636

Please sign in to comment.