-
Notifications
You must be signed in to change notification settings - Fork 398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggregated LOCOs of SmartTextVectorizer outputs #308
Changes from 31 commits
ae0c05c
dae1c76
bfac6dd
f63be09
fb5c849
4ba11d8
f847850
1d1afc5
754891c
797222d
79467ba
250e1fd
7e13053
22eeb1b
9df934a
4829d39
aa0d662
8d28e1e
50653a0
33dfd4e
c37484a
6763b64
9033712
b427857
44075b2
92c160a
5302039
46ea9e0
c8c0cf9
21937db
f964818
ad1f32a
75fad54
5df59b5
2510762
2e1628a
57bcf6b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ package com.salesforce.op.stages.impl.insights | |
import com.salesforce.op.UID | ||
import com.salesforce.op.features.types._ | ||
import com.salesforce.op.stages.base.unary.UnaryTransformer | ||
import com.salesforce.op.stages.impl.feature.{SmartTextMapVectorizer, SmartTextVectorizer} | ||
import com.salesforce.op.stages.impl.selector.SelectedModel | ||
import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel | ||
import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ | ||
|
@@ -43,7 +44,8 @@ import org.apache.spark.ml.Model | |
import org.apache.spark.ml.linalg.Vectors | ||
import org.apache.spark.ml.param.{IntParam, Param} | ||
|
||
import scala.collection.mutable.PriorityQueue | ||
import scala.collection.mutable | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
/** | ||
* Creates record level insights for model predictions. Takes the model to explain as a constructor argument. | ||
|
@@ -55,6 +57,7 @@ import scala.collection.mutable.PriorityQueue | |
* derived features based on the LOCO insight.For MultiClassification, the value is from the predicted class | ||
* (i.e. the class having the highest probability) | ||
* - If Abs, returns at most topK elements : the topK derived features having highest absolute value of LOCO score. | ||
* | ||
* @param model model instance that you wish to explain | ||
* @param uid uid for instance | ||
*/ | ||
|
@@ -71,20 +74,15 @@ class RecordInsightsLOCO[T <: Model[T]] | |
) | ||
|
||
def setTopK(value: Int): this.type = set(topK, value) | ||
|
||
def getTopK: Int = $(topK) | ||
|
||
setDefault(topK -> 20) | ||
|
||
final val topKStrategy = new Param[String](parent = this, name = "topKStrategy", | ||
doc = "Whether returning topK based on absolute value or topK positives and negatives. For MultiClassification," + | ||
" the value is from the predicted class (i.e. the class having the highest probability)" | ||
) | ||
|
||
def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName) | ||
|
||
def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy)) | ||
|
||
setDefault(topKStrategy, TopKStrategy.Abs.entryName) | ||
|
||
private val modelApply = model match { | ||
|
@@ -93,107 +91,179 @@ class RecordInsightsLOCO[T <: Model[T]] | |
case m => toOPUnchecked(m).transformFn | ||
} | ||
private val labelDummy = RealNN(0.0) | ||
private lazy val histories = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory() | ||
private lazy val featureInfo = histories.map(_.toJson(false)) | ||
|
||
private lazy val featureInfo = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory().map(_.toJson(false)) | ||
/** | ||
* These are the name of the stages we want to perform an aggregation of the LOCO results over derived features | ||
*/ | ||
private val smartTextClassName = classOf[SmartTextVectorizer[_]].getSimpleName | ||
private val smartTextMapClassName = classOf[SmartTextMapVectorizer[_]].getSimpleName | ||
// Indices of features derived from SmartText(Map)Vectorizer | ||
private lazy val textFeatureIndices = histories | ||
.filter(_.parentFeatureStages.exists(s => s.contains(smartTextClassName) || s.contains(smartTextMapClassName))) | ||
.map(_.index) | ||
.distinct.sorted | ||
tovbinm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private def computeDiffs(i: Int, oldInd: Int, oldVal: Double, featureArray: Array[(Int, Double)], featureSize: Int, | ||
baseScore: Array[Double]): Array[Double] = { | ||
featureArray.update(i, (oldInd, 0)) | ||
val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score | ||
private def computeDiffs | ||
( | ||
i: Int, | ||
oldInd: Int, | ||
oldVal: Double, | ||
featureArray: Array[(Int, Double)], | ||
featureSize: Int, | ||
baseScore: Array[Double] | ||
): Array[Double] = { | ||
featureArray.update(i, (oldInd, 0.0)) | ||
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score | ||
val diffs = baseScore.zip(score).map { case (b, s) => b - s } | ||
featureArray.update(i, (oldInd, oldVal)) | ||
diffs | ||
} | ||
|
||
private def returnTopPosNeg(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int, | ||
baseScore: Array[Double], k: Int, indexToExamine: Int): Seq[(Int, Double, Array[Double])] = { | ||
// Heap that will contain the top K positive LOCO values | ||
val positiveMaxHeap = PriorityQueue.empty(MinScore) | ||
// Heap that will contain the top K negative LOCO values | ||
val negativeMaxHeap = PriorityQueue.empty(MaxScore) | ||
// for each element of the feature vector != 0.0 | ||
// Size of positive heap | ||
var positiveCount = 0 | ||
// Size of negative heap | ||
var negativeCount = 0 | ||
var i = 0 | ||
while (i < filledSize) { | ||
private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = { | ||
left.zipAll(right, 0.0, 0.0).map { case (l, r) => l + r } | ||
} | ||
|
||
private def returnTopPosNeg | ||
( | ||
featureArray: Array[(Int, Double)], | ||
featureSize: Int, | ||
baseScore: Array[Double], | ||
k: Int, | ||
indexToExamine: Int | ||
): Seq[LOCOValue] = { | ||
|
||
val minMaxHeap = new MinMaxHeap(k) | ||
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])] | ||
for {i <- featureArray.indices} { | ||
val (oldInd, oldVal) = featureArray(i) | ||
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore) | ||
val max = diffToExamine(indexToExamine) | ||
val history = histories(oldInd) | ||
|
||
if (max > 0.0) { // if positive LOCO then add it to positive heap | ||
positiveMaxHeap.enqueue((i, max, diffToExamine)) | ||
positiveCount += 1 | ||
if (positiveCount > k) { // remove the lowest element if the heap size goes from 5 to 6 | ||
positiveMaxHeap.dequeue() | ||
// Let's check the indicator value and descriptor value | ||
// If those values are empty, the field is likely to be a derived text feature (hashing tf output) | ||
if (history.indicatorValue.isEmpty && history.descriptorValue.isEmpty) { | ||
// Name of the field | ||
val rawName = history.parentFeatureStages match { | ||
case s if s.exists(_.contains(smartTextClassName)) => history.parentFeatureOrigins.headOption | ||
case s if s.exists(_.contains(smartTextMapClassName)) => history.grouping | ||
case _ => None | ||
tovbinm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
// Update the aggregation map | ||
for {name <- rawName} { | ||
val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double])) | ||
aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine))) | ||
} | ||
} else if (max < 0.0) { // if negative LOCO then add it to negative heap | ||
negativeMaxHeap.enqueue((i, max, diffToExamine)) | ||
negativeCount += 1 | ||
if (negativeCount > k) { // remove the highest element if the heap size goes from 5 to 6 | ||
negativeMaxHeap.dequeue() | ||
} // Not keeping LOCOs with value 0 | ||
} else { | ||
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) | ||
} | ||
i += 1 | ||
} | ||
val topPositive = positiveMaxHeap.dequeueAll | ||
val topNegative = negativeMaxHeap.dequeueAll | ||
(topPositive ++ topNegative) | ||
|
||
// Adding LOCO results from aggregation map into heaps | ||
for {(indices, ar) <- aggregationMap.values} { | ||
// The index here is arbitrary | ||
val (i, n) = (indices.head, indices.length) | ||
val diffToExamine = ar.map(_ / n) | ||
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) | ||
} | ||
|
||
minMaxHeap.dequeueAll | ||
} | ||
|
||
override def transformFn: OPVector => TextMap = (features) => { | ||
override def transformFn: OPVector => TextMap = features => { | ||
val baseResult = modelApply(labelDummy, features) | ||
val baseScore = baseResult.score | ||
|
||
// TODO sparse implementation only works if changing values to zero - use dense vector to test effect of zeros | ||
// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros | ||
val featuresSparse = features.value.toSparse | ||
val featureArray = featuresSparse.indices.zip(featuresSparse.values) | ||
val filledSize = featureArray.length | ||
val res = ArrayBuffer.empty[(Int, Double)] | ||
featuresSparse.foreachActive((i, v) => res += i -> v) | ||
// Besides non 0 values, we want to check the text features as well | ||
textFeatureIndices.foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0) | ||
val featureArray = res.toArray | ||
val featureSize = featuresSparse.size | ||
|
||
val k = $(topK) | ||
// Index where to examine the difference in the prediction vector | ||
val indexToExamine = baseScore.length match { | ||
case 0 => throw new RuntimeException("model does not produce scores for insights") | ||
case 1 => 0 | ||
case 2 => 1 | ||
case n if (n > 2) => baseResult.prediction.toInt | ||
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability) | ||
case n if n > 2 => baseResult.prediction.toInt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not following - why prediction value becomes an index?! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to return the LOCO score of the predicted class. Let's say for a row the LOCOs are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add some docs to explain it, cause it looks weird to me. |
||
} | ||
val topPosNeg = returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, k, indexToExamine) | ||
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine) | ||
val top = getTopKStrategy match { | ||
case TopKStrategy.Abs => topPosNeg.sortBy { case (_, v, _) => -math.abs(v) }.take(k) | ||
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case (_, v, _) => -v } | ||
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k) | ||
// Take top K positive and top K negative LOCOs, hence 2 * K | ||
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k) | ||
} | ||
|
||
top.map { case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) } | ||
.toMap.toTextMap | ||
top.map { case LOCOValue(i, _, diffs) => | ||
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs) | ||
}.toMap.toTextMap | ||
} | ||
|
||
} | ||
|
||
/** | ||
* Heap to keep top K of min and max LOCO values | ||
* | ||
* @param k number of values to keep | ||
*/ | ||
private class MinMaxHeap(k: Int) { | ||
|
||
// Heap that will contain the top K positive LOCO values | ||
private val positives = mutable.PriorityQueue.empty(MinScore) | ||
|
||
// Heap that will contain the top K negative LOCO values | ||
private val negatives = mutable.PriorityQueue.empty(MaxScore) | ||
|
||
def enqueue(loco: LOCOValue): Unit = { | ||
// Not keeping LOCOs with value 0, i.e. for each element of the feature vector != 0.0 | ||
if (loco.value > 0.0) { // if positive LOCO then add it to positive heap | ||
positives.enqueue(loco) | ||
// remove the lowest element if the heap size goes above k | ||
if (positives.length > k) positives.dequeue() | ||
} else if (loco.value < 0.0) { // if negative LOCO then add it to negative heap | ||
negatives.enqueue(loco) | ||
// remove the highest element if the heap size goes above k | ||
if (negatives.length > k) negatives.dequeue() | ||
} | ||
} | ||
|
||
def dequeueAll: Seq[LOCOValue] = positives.dequeueAll ++ negatives.dequeueAll | ||
|
||
} | ||
|
||
/** | ||
* LOCO value container | ||
* | ||
* @param i feature value index | ||
* @param value value - min or max, depending on the ordering | ||
* @param diffs scores diff | ||
*/ | ||
private case class LOCOValue(i: Int, value: Double, diffs: Array[Double]) | ||
|
||
/** | ||
* Ordering of the heap that removes lowest score | ||
*/ | ||
private object MinScore extends Ordering[(Int, Double, Array[Double])] { | ||
def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = | ||
y._2 compare x._2 | ||
private object MinScore extends Ordering[LOCOValue] { | ||
def compare(x: LOCOValue, y: LOCOValue): Int = y.value compare x.value | ||
} | ||
|
||
/** | ||
* Ordering of the heap that removes highest score | ||
*/ | ||
private object MaxScore extends Ordering[(Int, Double, Array[Double])] { | ||
def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = | ||
x._2 compare y._2 | ||
private object MaxScore extends Ordering[LOCOValue] { | ||
def compare(x: LOCOValue, y: LOCOValue): Int = x.value compare y.value | ||
} | ||
|
||
sealed abstract class TopKStrategy(val name: String) extends EnumEntry with Serializable | ||
|
||
object TopKStrategy extends Enum[TopKStrategy] { | ||
val values = findValues | ||
|
||
case object Abs extends TopKStrategy("abs") | ||
|
||
case object PositiveNegative extends TopKStrategy("positive and negative") | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can there be a custom text vectorizer, if yes shouldn't we handle that too ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately custom Text Vectorizer cannot necessary return hashes as output. This aggregation is super limited and doesn't cover all the use cases yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks.