Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into mllib-stats-api-c…
Browse files Browse the repository at this point in the history
…heck
  • Loading branch information
jkbradley committed Aug 18, 2014
2 parents 60c72d9 + 9306b8c commit 8d1e555
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.shuffleReadBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta

val inputBytesDelta =
(taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
- oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta

val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.scalatest.Matchers

import org.apache.spark._
import org.apache.spark.{LocalSparkContext, SparkConf, Success}
import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -150,6 +150,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.executorRunTime = base + 4
taskMetrics.diskBytesSpilled = base + 5
taskMetrics.memoryBytesSpilled = base + 6
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.bytesRead = base + 7
taskMetrics
}

Expand Down Expand Up @@ -182,6 +185,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(stage1Data.diskBytesSpilled == 205)
assert(stage0Data.memoryBytesSpilled == 112)
assert(stage1Data.memoryBytesSpilled == 206)
assert(stage0Data.inputBytes == 114)
assert(stage1Data.inputBytes == 207)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 2)
assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
Expand All @@ -208,6 +213,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(stage1Data.diskBytesSpilled == 610)
assert(stage0Data.memoryBytesSpilled == 412)
assert(stage1Data.memoryBytesSpilled == 612)
assert(stage0Data.inputBytes == 414)
assert(stage1Data.inputBytes == 614)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 302)
assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get
Expand Down
9 changes: 5 additions & 4 deletions dev/create-release/create-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ make_binary_release() {
spark-$RELEASE_VERSION-bin-$NAME.tgz.sha
}

make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4"
make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4" &
make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" &
make_binary_release "hadoop2" \
"-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
"-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" &
make_binary_release "hadoop2-without-hive" \
"-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
"-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" &
wait

# Copy data
echo "Copying release tarballs"
Expand Down
63 changes: 62 additions & 1 deletion docs/mllib-feature-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,65 @@ displayTitle: <a href="mllib-guide.html">MLlib</a> - Feature Extraction

## Word2Vec

## TFIDF
Word2Vec computes distributed vector representation of words. The main advantage of the distributed
representations is that similar words are close in the vector space, which makes generalization to
novel patterns easier and model estimation more robust. Distributed vector representation is
showed to be useful in many natural language processing applications such as named entity
recognition, disambiguation, parsing, tagging and machine translation.

### Model

In our implementation of Word2Vec, we used skip-gram model. The training objective of skip-gram is
to learn word vector representations that are good at predicting its context in the same sentence.
Mathematically, given a sequence of training words `$w_1, w_2, \dots, w_T$`, the objective of the
skip-gram model is to maximize the average log-likelihood
`\[
\frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t)
\]`
where $k$ is the size of the training window.

In the skip-gram model, every word $w$ is associated with two vectors $u_w$ and $v_w$ which are
vector representations of $w$ as word and context respectively. The probability of correctly
predicting word $w_i$ given word $w_j$ is determined by the softmax model, which is
`\[
p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})}
\]`
where $V$ is the vocabulary size.

The skip-gram model with softmax is expensive because the cost of computing $\log p(w_i | w_j)$
is proportional to $V$, which can be easily in order of millions. To speed up training of Word2Vec,
we used hierarchical softmax, which reduced the complexity of computing of $\log p(w_i | w_j)$ to
$O(\log(V))$

### Example

The example below demonstrates how to load a text file, parse it as an RDD of `Seq[String]`,
construct a `Word2Vec` instance and then fit a `Word2VecModel` with the input data. Finally,
we display the top 40 synonyms of the specified word. To run the example, first download
the [text8](http://mattmahoney.net/dc/text8.zip) data and extract it to your preferred directory.
Here we assume the extracted file is `text8` and in same directory as you run the spark shell.

<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.Word2Vec

val input = sc.textFile("text8").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("china", 40)

for((synonym, cosineSimilarity) <- synonyms) {
println(s"$synonym $cosineSimilarity")
}
{% endhighlight %}
</div>
</div>

## TFIDF
59 changes: 35 additions & 24 deletions mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.XORShiftRandom
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap

/**
* Entry in vocabulary
Expand Down Expand Up @@ -287,11 +288,12 @@ class Word2Vec extends Serializable with Logging {
var syn0Global =
Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
var syn1Global = new Array[Float](vocabSize * vectorSize)

var alpha = startingAlpha
for (k <- 1 to numIterations) {
val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) =>
val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8))
val syn0Modify = new Array[Int](vocabSize)
val syn1Modify = new Array[Int](vocabSize)
val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) {
case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
var lwc = lastWordCount
Expand Down Expand Up @@ -321,7 +323,8 @@ class Word2Vec extends Serializable with Logging {
// Hierarchical softmax
var d = 0
while (d < bcVocab.value(word).codeLen) {
val l2 = bcVocab.value(word).point(d) * vectorSize
val inner = bcVocab.value(word).point(d)
val l2 = inner * vectorSize
// Propagate hidden -> output
var f = blas.sdot(vectorSize, syn0, l1, 1, syn1, l2, 1)
if (f > -MAX_EXP && f < MAX_EXP) {
Expand All @@ -330,10 +333,12 @@ class Word2Vec extends Serializable with Logging {
val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
blas.saxpy(vectorSize, g, syn1, l2, 1, neu1e, 0, 1)
blas.saxpy(vectorSize, g, syn0, l1, 1, syn1, l2, 1)
syn1Modify(inner) += 1
}
d += 1
}
blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, l1, 1)
syn0Modify(lastWord) += 1
}
}
a += 1
Expand All @@ -342,21 +347,36 @@ class Word2Vec extends Serializable with Logging {
}
(syn0, syn1, lwc, wc)
}
Iterator(model)
val syn0Local = model._1
val syn1Local = model._2
val synOut = new PrimitiveKeyOpenHashMap[Int, Array[Float]](vocabSize * 2)
var index = 0
while(index < vocabSize) {
if (syn0Modify(index) != 0) {
synOut.update(index, syn0Local.slice(index * vectorSize, (index + 1) * vectorSize))
}
if (syn1Modify(index) != 0) {
synOut.update(index + vocabSize,
syn1Local.slice(index * vectorSize, (index + 1) * vectorSize))
}
index += 1
}
Iterator(synOut)
}
val (aggSyn0, aggSyn1, _, _) =
partial.treeReduce { case ((syn0_1, syn1_1, lwc_1, wc_1), (syn0_2, syn1_2, lwc_2, wc_2)) =>
val n = syn0_1.length
val weight1 = 1.0f * wc_1 / (wc_1 + wc_2)
val weight2 = 1.0f * wc_2 / (wc_1 + wc_2)
blas.sscal(n, weight1, syn0_1, 1)
blas.sscal(n, weight1, syn1_1, 1)
blas.saxpy(n, weight2, syn0_2, 1, syn0_1, 1)
blas.saxpy(n, weight2, syn1_2, 1, syn1_1, 1)
(syn0_1, syn1_1, lwc_1 + lwc_2, wc_1 + wc_2)
val synAgg = partial.flatMap(x => x).reduceByKey { case (v1, v2) =>
blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1)
v1
}.collect()
var i = 0
while (i < synAgg.length) {
val index = synAgg(i)._1
if (index < vocabSize) {
Array.copy(synAgg(i)._2, 0, syn0Global, index * vectorSize, vectorSize)
} else {
Array.copy(synAgg(i)._2, 0, syn1Global, (index - vocabSize) * vectorSize, vectorSize)
}
syn0Global = aggSyn0
syn1Global = aggSyn1
i += 1
}
}
newSentences.unpersist()

Expand Down Expand Up @@ -414,15 +434,6 @@ class Word2VecModel private[mllib] (
}
}

/**
* Transforms an RDD to its vector representation
* @param dataset a an RDD of words
* @return RDD of vector representation
*/
def transform(dataset: RDD[String]): RDD[Vector] = {
dataset.map(word => transform(word))
}

/**
* Find synonyms of a word
* @param word a word
Expand Down

0 comments on commit 8d1e555

Please sign in to comment.