Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregated LOCOs of SmartTextVectorizer outputs #308

Merged
merged 37 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ae0c05c
Aggregated texts
mweilsalesforce May 8, 2019
dae1c76
Merge branch 'master' into mw/Aggregation
michaelweilsalesforce May 8, 2019
bfac6dd
Merge branch 'master' into mw/Aggregation
michaelweilsalesforce May 10, 2019
f63be09
sum array
mweilsalesforce May 10, 2019
fb5c849
Merge branch 'master' into mw/Aggregation
michaelweilsalesforce May 13, 2019
4ba11d8
getSimpleName fix
mweilsalesforce May 13, 2019
f847850
Merge branch 'mw/Aggregation' of https://github.com/salesforce/Transm…
mweilsalesforce May 13, 2019
1d1afc5
Added functionality for SmartTextMapVectorizer
mweilsalesforce May 14, 2019
754891c
Merge branch 'master' into mw/Aggregation
tovbinm May 16, 2019
797222d
Adding comments
mweilsalesforce May 20, 2019
79467ba
Merge branch 'master' into mw/Aggregation
tovbinm May 20, 2019
250e1fd
Merge branch 'master' into mw/Aggregation
michaelweilsalesforce May 21, 2019
7e13053
fixes
tovbinm May 24, 2019
22eeb1b
cleanup
tovbinm May 24, 2019
9df934a
Merge branch 'master' into mw/Aggregation
tovbinm May 28, 2019
4829d39
updates
tovbinm May 28, 2019
aa0d662
Merge branch 'mw/Aggregation' of github.com:salesforce/TransmogrifAI …
tovbinm May 28, 2019
8d28e1e
flatmap
tovbinm May 28, 2019
50653a0
typoo
tovbinm May 28, 2019
33dfd4e
cleanup
tovbinm May 28, 2019
c37484a
avoid .value on sparse vector
tovbinm May 28, 2019
6763b64
use breeze directly
tovbinm May 28, 2019
9033712
fixin
tovbinm May 28, 2019
b427857
ff
tovbinm May 28, 2019
44075b2
added MinMaxHeap class
tovbinm May 28, 2019
92c160a
Merge branch 'master' into mw/Aggregation
tovbinm May 29, 2019
5302039
Merge branch 'master' into mw/Aggregation
michaelweilsalesforce May 30, 2019
46ea9e0
Take top(2k) for PositiveAndNegative
mweilsalesforce May 30, 2019
c8c0cf9
Merge branch 'mw/Aggregation' of https://github.com/salesforce/Transm…
mweilsalesforce May 30, 2019
21937db
Fix Merge Conflict
mweilsalesforce May 30, 2019
f964818
Adding comment for Multiclass case
mweilsalesforce May 30, 2019
ad1f32a
Aggregation based on type and not on stage anymore
mweilsalesforce Jun 3, 2019
75fad54
Fix Scalastyle
mweilsalesforce Jun 3, 2019
5df59b5
use feature.typename
tovbinm Jun 3, 2019
2510762
Stages won't appear in key names + fix bug of same key names in diffe…
mweilsalesforce Jun 3, 2019
2e1628a
Merge branch 'mw/Aggregation' of https://github.com/salesforce/Transm…
mweilsalesforce Jun 3, 2019
57bcf6b
Fix merge conflict
mweilsalesforce Jun 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import org.apache.spark.ml.Model
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.{IntParam, Param}

import scala.collection.mutable.PriorityQueue
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* Creates record level insights for model predictions. Takes the model to explain as a constructor argument.
Expand All @@ -55,6 +56,7 @@ import scala.collection.mutable.PriorityQueue
* derived features based on the LOCO insight.For MultiClassification, the value is from the predicted class
* (i.e. the class having the highest probability)
* - If Abs, returns at most topK elements : the topK derived features having highest absolute value of LOCO score.
*
* @param model model instance that you wish to explain
* @param uid uid for instance
*/
Expand All @@ -71,20 +73,15 @@ class RecordInsightsLOCO[T <: Model[T]]
)

def setTopK(value: Int): this.type = set(topK, value)

def getTopK: Int = $(topK)

setDefault(topK -> 20)

final val topKStrategy = new Param[String](parent = this, name = "topKStrategy",
doc = "Whether returning topK based on absolute value or topK positives and negatives. For MultiClassification," +
" the value is from the predicted class (i.e. the class having the highest probability)"
)

def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName)

def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy))

setDefault(topKStrategy, TopKStrategy.Abs.entryName)

private val modelApply = model match {
Expand All @@ -93,107 +90,183 @@ class RecordInsightsLOCO[T <: Model[T]]
case m => toOPUnchecked(m).transformFn
}
private val labelDummy = RealNN(0.0)
private lazy val histories = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory()
private lazy val featureInfo = histories.map(_.toJson(false))

/**
* These are the name of the types we want to perform an aggregation of the LOCO results over derived features
*/
private val textTypes =
Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea], FeatureType.typeName[TextList])
private val textMapTypes =
Set(FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap])

private lazy val featureInfo = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory().map(_.toJson(false))
// Indices of features derived from Text(Map)Vectorizer
private lazy val textFeatureIndices = histories
.filter(_.parentFeatureType.exists((textTypes ++ textMapTypes).contains))
.map(_.index)
.distinct.sorted

private def computeDiffs(i: Int, oldInd: Int, oldVal: Double, featureArray: Array[(Int, Double)], featureSize: Int,
baseScore: Array[Double]): Array[Double] = {
featureArray.update(i, (oldInd, 0))
val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score
private def computeDiffs
(
i: Int,
oldInd: Int,
oldVal: Double,
featureArray: Array[(Int, Double)],
featureSize: Int,
baseScore: Array[Double]
): Array[Double] = {
featureArray.update(i, (oldInd, 0.0))
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score
val diffs = baseScore.zip(score).map { case (b, s) => b - s }
featureArray.update(i, (oldInd, oldVal))
diffs
}

private def returnTopPosNeg(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int,
baseScore: Array[Double], k: Int, indexToExamine: Int): Seq[(Int, Double, Array[Double])] = {
// Heap that will contain the top K positive LOCO values
val positiveMaxHeap = PriorityQueue.empty(MinScore)
// Heap that will contain the top K negative LOCO values
val negativeMaxHeap = PriorityQueue.empty(MaxScore)
// for each element of the feature vector != 0.0
// Size of positive heap
var positiveCount = 0
// Size of negative heap
var negativeCount = 0
var i = 0
while (i < filledSize) {
private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = {
left.zipAll(right, 0.0, 0.0).map { case (l, r) => l + r }
}

private def returnTopPosNeg
(
featureArray: Array[(Int, Double)],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {

val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]
for {i <- featureArray.indices} {
val (oldInd, oldVal) = featureArray(i)
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore)
val max = diffToExamine(indexToExamine)
val history = histories(oldInd)

if (max > 0.0) { // if positive LOCO then add it to positive heap
positiveMaxHeap.enqueue((i, max, diffToExamine))
positiveCount += 1
if (positiveCount > k) { // remove the lowest element if the heap size goes from 5 to 6
positiveMaxHeap.dequeue()
// Let's check the indicator value and descriptor value
// If those values are empty, the field is likely to be a derived text feature (hashing tf output)
if (textFeatureIndices.contains(oldInd) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check here to make sure we don't accidentally pick up text features that were determined to be categorical by the smart text vectorizer? If so, we should probably make an easier way to tell what transformations were applied to the text.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only smartVectorizer. Any feature with indicator/descriptor values and derived from a text transformation is likely to be easily interpreted once LOCO is done.

// Name of the field
val rawName = history.parentFeatureType match {
case s if s.exists(textMapTypes.contains) => history.grouping
case s if s.exists(textTypes.contains) => history.parentFeatureOrigins.headOption
case s => throw new Error(s"type should be Text or TextMap, here ${s.mkString(",")}")
}
// Update the aggregation map
for {name <- rawName} {
val key = name + "_" + history.parentFeatureStages.mkString(",")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key value can be quite large is do concatenate all the parent stages. what's the rationale behind it? @mweilsalesforce

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to apply different transformations on a same feature.
After thought, maybe we should aggregate all the derived features even if 2 different transformations were applied

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok fixed

val (indices, array) = aggregationMap.getOrElse(key, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(key, (indices :+ i, sumArrays(array, diffToExamine)))
}
} else if (max < 0.0) { // if negative LOCO then add it to negative heap
negativeMaxHeap.enqueue((i, max, diffToExamine))
negativeCount += 1
if (negativeCount > k) { // remove the highest element if the heap size goes from 5 to 6
negativeMaxHeap.dequeue()
} // Not keeping LOCOs with value 0
} else {
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
i += 1
}
val topPositive = positiveMaxHeap.dequeueAll
val topNegative = negativeMaxHeap.dequeueAll
(topPositive ++ topNegative)

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = (features) => {
override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score

// TODO sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val featureArray = featuresSparse.indices.zip(featuresSparse.values)
val filledSize = featureArray.length
val res = ArrayBuffer.empty[(Int, Double)]
featuresSparse.foreachActive((i, v) => res += i -> v)
// Besides non 0 values, we want to check the text features as well
textFeatureIndices.foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0)
val featureArray = res.toArray
val featureSize = featuresSparse.size

val k = $(topK)
// Index where to examine the difference in the prediction vector
val indexToExamine = baseScore.length match {
case 0 => throw new RuntimeException("model does not produce scores for insights")
case 1 => 0
case 2 => 1
case n if (n > 2) => baseResult.prediction.toInt
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not following - why prediction value becomes an index?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to return the LOCO score of the predicted class.

Let's say for a row the LOCOs are 0 -> LOCO_0, 1 -> LOCO_1, 2 -> LOCO_2, and the model predicts the class 1 on this row. Then we want to return LOCO_1

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add some docs to explain it, cause it looks weird to me.

}
val topPosNeg = returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case (_, v, _) => -math.abs(v) }.take(k)
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case (_, v, _) => -v }
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

top.map { case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) }
.toMap.toTextMap
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs)
}.toMap.toTextMap
}

}

/**
* Heap to keep top K of min and max LOCO values
*
* @param k number of values to keep
*/
private class MinMaxHeap(k: Int) {

// Heap that will contain the top K positive LOCO values
private val positives = mutable.PriorityQueue.empty(MinScore)

// Heap that will contain the top K negative LOCO values
private val negatives = mutable.PriorityQueue.empty(MaxScore)

def enqueue(loco: LOCOValue): Unit = {
// Not keeping LOCOs with value 0, i.e. for each element of the feature vector != 0.0
if (loco.value > 0.0) { // if positive LOCO then add it to positive heap
positives.enqueue(loco)
// remove the lowest element if the heap size goes above k
if (positives.length > k) positives.dequeue()
} else if (loco.value < 0.0) { // if negative LOCO then add it to negative heap
negatives.enqueue(loco)
// remove the highest element if the heap size goes above k
if (negatives.length > k) negatives.dequeue()
}
}

def dequeueAll: Seq[LOCOValue] = positives.dequeueAll ++ negatives.dequeueAll

}

/**
* LOCO value container
*
* @param i feature value index
* @param value value - min or max, depending on the ordering
* @param diffs scores diff
*/
private case class LOCOValue(i: Int, value: Double, diffs: Array[Double])

/**
* Ordering of the heap that removes lowest score
*/
private object MinScore extends Ordering[(Int, Double, Array[Double])] {
def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int =
y._2 compare x._2
private object MinScore extends Ordering[LOCOValue] {
def compare(x: LOCOValue, y: LOCOValue): Int = y.value compare x.value
}

/**
* Ordering of the heap that removes highest score
*/
private object MaxScore extends Ordering[(Int, Double, Array[Double])] {
def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int =
x._2 compare y._2
private object MaxScore extends Ordering[LOCOValue] {
def compare(x: LOCOValue, y: LOCOValue): Int = x.value compare y.value
}

sealed abstract class TopKStrategy(val name: String) extends EnumEntry with Serializable

object TopKStrategy extends Enum[TopKStrategy] {
val values = findValues

case object Abs extends TopKStrategy("abs")

case object PositiveNegative extends TopKStrategy("positive and negative")

}
Loading