Skip to content

Commit

Permalink
SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sect…
Browse files Browse the repository at this point in the history
…ion...

...s of CoGroupedRDD and PairRDDFunctions

This also removes an unnecessary tuple creation in cogroup.

Author: Sandy Ryza <sandy@cloudera.com>

Closes apache#1447 from sryza/sandy-spark-2519-2 and squashes the following commits:

b6d9699 [Sandy Ryza] Remove missed Tuple2 match in CoGroupedRDD
a109828 [Sandy Ryza] Remove another pattern matching in MappedValuesRDD and revert some changes in PairRDDFunctions
be10f8a [Sandy Ryza] SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sections of CoGroupedRDD and PairRDDFunctions
  • Loading branch information
sryza authored and rxin committed Jul 20, 2014
1 parent 4da01e3 commit 98ab411
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:

val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
value match { case (v, depNum) => newCombiner(depNum) += v }
newCombiner(value._2) += value._1
newCombiner
}
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
value match { case (v, depNum) => combiner(depNum) += v }
combiner(value._2) += value._1
combiner
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
override val partitioner = firstParent[Product2[K, U]].partitioner

override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
}
}
60 changes: 30 additions & 30 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]

val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
m2.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
m1
} : JHashMap[K, V]
Expand Down Expand Up @@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w)
}
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
)
}

/**
Expand All @@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
vs.map(v => (v, None))
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
}
}
}
Expand All @@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
ws.map(w => (None, w))
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.map(w => (None, w))
} else {
for (v <- vs; w <- ws) yield (Some(v), w)
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
}
}
}
Expand Down Expand Up @@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
data.foreach { case (k, v) => map.put(k, v) }
data.foreach { pair => map.put(pair._1, pair._2) }
map
}

Expand Down Expand Up @@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
}
}

Expand All @@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
cg.mapValues { case Seq(vs, w1s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
}
}

Expand All @@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
}
}

Expand Down Expand Up @@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
} : Seq[V]
Expand Down Expand Up @@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
val pair = iter.next()
writer.write(pair._1, pair._2)
}
} finally {
writer.close(hadoopContext)
Expand Down

0 comments on commit 98ab411

Please sign in to comment.