Skip to content

Commit

Permalink
Fixed test, and renamed variable.
Browse files Browse the repository at this point in the history
  • Loading branch information
soumitrak committed Nov 11, 2014
1 parent 3da51a2 commit 9781135
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
preservePartitioning: Boolean,
initial : Option[RDD[(K, S)]]
initialRDD : Option[RDD[(K, S)]]
) extends DStream[(K, S)](parent.ssc) {

super.persist(StorageLevel.MEMORY_ONLY_SER)
Expand Down Expand Up @@ -95,7 +95,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
initial match {
initialRDD match {
case None => {
// Define the function for the mapPartition operation on grouped RDD;
// first map the grouped tuple to tuples of required type,
Expand All @@ -110,8 +110,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// logDebug("Generating state RDD for time " + validTime + " (first)")
Some (sessionRDD)
}
case Some (initialRDD) => {
computeUsingPreviousRDD(parentRDD, initialRDD)
case Some (initialStateRDD) => {
computeUsingPreviousRDD(parentRDD, initialStateRDD)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1252,12 +1252,12 @@ public void testUpdateStateByKeyWithInitial() {
JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD (tmpRDD);

List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(new Tuple2<String, Integer>("california", 5),
new Tuple2<String, Integer>("new york", 7)),
Arrays.asList(new Tuple2<String, Integer>("california", 15),
new Tuple2<String, Integer>("new york", 11)),
Arrays.asList(new Tuple2<String, Integer>("california", 15),
new Tuple2<String, Integer>("new york", 11)));
Arrays.asList(new Tuple2<String, Integer>("new york", 7),
new Tuple2<String, Integer>("california", 5)),
Arrays.asList(new Tuple2<String, Integer>("new york", 11),
new Tuple2<String, Integer>("california", 15)),
Arrays.asList(new Tuple2<String, Integer>("new york", 11),
new Tuple2<String, Integer>("california", 15)));

JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
Expand Down

0 comments on commit 9781135

Please sign in to comment.